#!/usr/bin/env python3 from __future__ import annotations import argparse import itertools from pathlib import Path import requests DEFAULT_ENDPOINT = "http://127.0.0.1:8890/sparql" DEFAULT_GRAPH = "http://dbpedia.org" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Export a Virtuoso graph into deterministic RDF partitions by MD5(subject) prefix." ) parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT) parser.add_argument("--graph-uri", default=DEFAULT_GRAPH) parser.add_argument("--output-dir", required=True) parser.add_argument( "--prefixes", nargs="*", help="Hex prefixes to export, e.g. 00 01 ff. Default is all 256 two-hex-digit prefixes.", ) parser.add_argument("--overwrite", action="store_true") parser.add_argument("--timeout-sec", type=int, default=1800) return parser.parse_args() def all_prefixes() -> list[str]: return [a + b for a, b in itertools.product("0123456789abcdef", repeat=2)] def build_query(graph_uri: str, prefix: str) -> str: return f""" CONSTRUCT {{ ?s ?p ?o }} WHERE {{ GRAPH <{graph_uri}> {{ ?s ?p ?o . FILTER (SUBSTR(MD5(STR(?s)), 1, 2) = "{prefix}") }} }} """.strip() def export_partition( endpoint: str, graph_uri: str, prefix: str, output_path: Path, timeout_sec: int, ) -> None: params = { "query": build_query(graph_uri, prefix), "format": "text/plain", } with requests.get(endpoint, params=params, stream=True, timeout=timeout_sec) as response: response.raise_for_status() with output_path.open("wb") as handle: for chunk in response.iter_content(chunk_size=1 << 20): if chunk: handle.write(chunk) def main() -> int: args = parse_args() output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) prefixes = args.prefixes or all_prefixes() for prefix in prefixes: if len(prefix) != 2 or any(ch not in "0123456789abcdefABCDEF" for ch in prefix): raise ValueError(f"Invalid prefix: {prefix}") prefix = prefix.lower() output_path = output_dir / f"dbpedia_graph_{prefix}.nt" if output_path.exists() and output_path.stat().st_size > 0 and not args.overwrite: print(f"[skip] {output_path}") continue print(f"[export] {prefix} -> {output_path}") export_partition(args.endpoint, args.graph_uri, prefix, output_path, args.timeout_sec) return 0 if __name__ == "__main__": raise SystemExit(main())