| from __future__ import annotations | |
| import hashlib | |
| from pathlib import Path | |
| class ObjectStore: | |
| def __init__(self, root: Path): | |
| self.root = root | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| def put_bytes(self, data: bytes, suffix: str = "") -> tuple[str, str]: | |
| digest = hashlib.sha256(data).hexdigest() | |
| key = f"{digest[:2]}/{digest}{suffix}" | |
| path = self.root / key | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| if not path.exists(): | |
| path.write_bytes(data) | |
| return f"local://{key}", digest | |
| def get_bytes(self, uri: str) -> bytes: | |
| if not uri.startswith("local://"): | |
| raise ValueError(f"Unsupported object URI: {uri}") | |
| return (self.root / uri.removeprefix("local://")).read_bytes() | |
| def path_for(self, uri: str) -> Path: | |
| if not uri.startswith("local://"): | |
| raise ValueError(f"Unsupported object URI: {uri}") | |
| return self.root / uri.removeprefix("local://") | |