github-actions[bot]
Auto-sync from demo at Tue Dec 16 08:21:05 UTC 2025
31086ae
raw
history blame
4.04 kB
from pathlib import Path
from typing import Any, Dict, List, Union
import ray
import rdflib
from ray.data import Dataset
from rdflib import Literal
from rdflib.util import guess_format
from graphgen.bases.base_reader import BaseReader
from graphgen.utils import logger
class RDFReader(BaseReader):
"""
Reader for RDF files that extracts triples and represents them as dictionaries.
Uses Ray Data for distributed processing of multiple RDF files.
"""
def __init__(self, *, text_column: str = "content", **kwargs):
"""
Initialize RDFReader.
:param text_column: The column name for text content (default: "content").
"""
super().__init__(**kwargs)
self.text_column = text_column
def read(
self,
input_path: Union[str, List[str]],
) -> Dataset:
"""
Read RDF file(s) using Ray Data.
:param input_path: Path to RDF file or list of RDF files.
:return: Ray Dataset containing extracted documents.
"""
if not ray.is_initialized():
ray.init()
# Ensure input_path is a list to prevent Ray from splitting string into characters
if isinstance(input_path, str):
input_path = [input_path]
# Create dataset from file paths
paths_ds = ray.data.from_items(input_path)
def process_rdf(row: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Process a single RDF file and return list of documents."""
try:
file_path = row["item"]
return self._parse_rdf_file(Path(file_path))
except Exception as e:
logger.error(
"Failed to process RDF file %s: %s", row.get("item", "unknown"), e
)
return []
# Process files in parallel and flatten results
docs_ds = paths_ds.flat_map(process_rdf)
# Filter valid documents
docs_ds = docs_ds.filter(self._should_keep_item)
return docs_ds
def _parse_rdf_file(self, file_path: Path) -> List[Dict[str, Any]]:
"""
Parse a single RDF file and extract documents.
:param file_path: Path to RDF file.
:return: List of document dictionaries.
"""
if not file_path.is_file():
raise FileNotFoundError(f"RDF file not found: {file_path}")
g = rdflib.Graph()
fmt = guess_format(str(file_path))
try:
g.parse(str(file_path), format=fmt)
except Exception as e:
raise ValueError(f"Cannot parse RDF file {file_path}: {e}") from e
docs: List[Dict[str, Any]] = []
# Process each unique subject in the RDF graph
for subj in set(g.subjects()):
literals = []
props = {}
# Extract all triples for this subject
for _, pred, obj in g.triples((subj, None, None)):
pred_str = str(pred)
obj_str = str(obj)
# Collect literal values as text content
if isinstance(obj, Literal):
literals.append(obj_str)
# Store all properties (including non-literals)
props.setdefault(pred_str, []).append(obj_str)
# Join all literal values as the text content
text = " ".join(literals).strip()
if not text:
logger.warning(
"Subject %s in %s has no literal values; document will have empty '%s' field.",
subj,
file_path,
self.text_column,
)
# Create document dictionary
doc = {
"id": str(subj),
self.text_column: text,
"properties": props,
"source_file": str(file_path),
}
docs.append(doc)
if not docs:
logger.warning("RDF file %s contains no valid documents.", file_path)
return docs