iris-at-text2sparql / scripts /export_dbpedia_graph_partitions.py
Alex Latipov
Add clean HF snapshot deployment path
cfd076a
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import itertools
from pathlib import Path
import requests
DEFAULT_ENDPOINT = "http://127.0.0.1:8890/sparql"
DEFAULT_GRAPH = "http://dbpedia.org"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Export a Virtuoso graph into deterministic RDF partitions by MD5(subject) prefix."
)
parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT)
parser.add_argument("--graph-uri", default=DEFAULT_GRAPH)
parser.add_argument("--output-dir", required=True)
parser.add_argument(
"--prefixes",
nargs="*",
help="Hex prefixes to export, e.g. 00 01 ff. Default is all 256 two-hex-digit prefixes.",
)
parser.add_argument("--overwrite", action="store_true")
parser.add_argument("--timeout-sec", type=int, default=1800)
return parser.parse_args()
def all_prefixes() -> list[str]:
return [a + b for a, b in itertools.product("0123456789abcdef", repeat=2)]
def build_query(graph_uri: str, prefix: str) -> str:
return f"""
CONSTRUCT {{ ?s ?p ?o }}
WHERE {{
GRAPH <{graph_uri}> {{
?s ?p ?o .
FILTER (SUBSTR(MD5(STR(?s)), 1, 2) = "{prefix}")
}}
}}
""".strip()
def export_partition(
endpoint: str,
graph_uri: str,
prefix: str,
output_path: Path,
timeout_sec: int,
) -> None:
params = {
"query": build_query(graph_uri, prefix),
"format": "text/plain",
}
with requests.get(endpoint, params=params, stream=True, timeout=timeout_sec) as response:
response.raise_for_status()
with output_path.open("wb") as handle:
for chunk in response.iter_content(chunk_size=1 << 20):
if chunk:
handle.write(chunk)
def main() -> int:
args = parse_args()
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
prefixes = args.prefixes or all_prefixes()
for prefix in prefixes:
if len(prefix) != 2 or any(ch not in "0123456789abcdefABCDEF" for ch in prefix):
raise ValueError(f"Invalid prefix: {prefix}")
prefix = prefix.lower()
output_path = output_dir / f"dbpedia_graph_{prefix}.nt"
if output_path.exists() and output_path.stat().st_size > 0 and not args.overwrite:
print(f"[skip] {output_path}")
continue
print(f"[export] {prefix} -> {output_path}")
export_partition(args.endpoint, args.graph_uri, prefix, output_path, args.timeout_sec)
return 0
if __name__ == "__main__":
raise SystemExit(main())