Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import itertools | |
| from pathlib import Path | |
| import requests | |
| DEFAULT_ENDPOINT = "http://127.0.0.1:8890/sparql" | |
| DEFAULT_GRAPH = "http://dbpedia.org" | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Export a Virtuoso graph into deterministic RDF partitions by MD5(subject) prefix." | |
| ) | |
| parser.add_argument("--endpoint", default=DEFAULT_ENDPOINT) | |
| parser.add_argument("--graph-uri", default=DEFAULT_GRAPH) | |
| parser.add_argument("--output-dir", required=True) | |
| parser.add_argument( | |
| "--prefixes", | |
| nargs="*", | |
| help="Hex prefixes to export, e.g. 00 01 ff. Default is all 256 two-hex-digit prefixes.", | |
| ) | |
| parser.add_argument("--overwrite", action="store_true") | |
| parser.add_argument("--timeout-sec", type=int, default=1800) | |
| return parser.parse_args() | |
| def all_prefixes() -> list[str]: | |
| return [a + b for a, b in itertools.product("0123456789abcdef", repeat=2)] | |
| def build_query(graph_uri: str, prefix: str) -> str: | |
| return f""" | |
| CONSTRUCT {{ ?s ?p ?o }} | |
| WHERE {{ | |
| GRAPH <{graph_uri}> {{ | |
| ?s ?p ?o . | |
| FILTER (SUBSTR(MD5(STR(?s)), 1, 2) = "{prefix}") | |
| }} | |
| }} | |
| """.strip() | |
| def export_partition( | |
| endpoint: str, | |
| graph_uri: str, | |
| prefix: str, | |
| output_path: Path, | |
| timeout_sec: int, | |
| ) -> None: | |
| params = { | |
| "query": build_query(graph_uri, prefix), | |
| "format": "text/plain", | |
| } | |
| with requests.get(endpoint, params=params, stream=True, timeout=timeout_sec) as response: | |
| response.raise_for_status() | |
| with output_path.open("wb") as handle: | |
| for chunk in response.iter_content(chunk_size=1 << 20): | |
| if chunk: | |
| handle.write(chunk) | |
| def main() -> int: | |
| args = parse_args() | |
| output_dir = Path(args.output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| prefixes = args.prefixes or all_prefixes() | |
| for prefix in prefixes: | |
| if len(prefix) != 2 or any(ch not in "0123456789abcdefABCDEF" for ch in prefix): | |
| raise ValueError(f"Invalid prefix: {prefix}") | |
| prefix = prefix.lower() | |
| output_path = output_dir / f"dbpedia_graph_{prefix}.nt" | |
| if output_path.exists() and output_path.stat().st_size > 0 and not args.overwrite: | |
| print(f"[skip] {output_path}") | |
| continue | |
| print(f"[export] {prefix} -> {output_path}") | |
| export_partition(args.endpoint, args.graph_uri, prefix, output_path, args.timeout_sec) | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |