| """ |
| Artifact storage abstraction. |
| |
| The plan calls for a content-addressed artifact store so expensive intermediates |
| can be cached/reused across teacher/audit/training runs and moved between local |
| and remote environments (e.g., RunPod). |
| |
| Design goals: |
| - Local filesystem backend is always available (no extra deps). |
| - S3 backend is optional (requires boto3). |
| - Content addressed by SHA-256 of bytes for deterministic paths. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import json |
| import os |
| import shutil |
| import tempfile |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Optional, Protocol, Tuple |
|
|
|
|
| def _sha256_bytes(data: bytes) -> str: |
| h = hashlib.sha256() |
| h.update(data) |
| return h.hexdigest() |
|
|
|
|
| def _default_layout(digest_hex: str) -> str: |
| |
| return f"{digest_hex[:2]}/{digest_hex}" |
|
|
|
|
| class ArtifactStore(Protocol): |
| """ |
| Minimal interface used by pipelines. |
| |
| Implementations should return an artifact URI string, which can be either: |
| - local: "file:///abs/path/to/artifact" |
| - s3: "s3://bucket/prefix/..../digest" |
| """ |
|
|
| def put_bytes(self, data: bytes, *, ext: str = "") -> str: |
| raise NotImplementedError |
|
|
| def put_json(self, obj: Any) -> str: |
| raise NotImplementedError |
|
|
| def put_file(self, src: Path, *, content_addressed: bool = True) -> str: |
| raise NotImplementedError |
|
|
| def exists(self, uri: str) -> bool: |
| raise NotImplementedError |
|
|
| def materialize(self, uri: str, dst: Path) -> Path: |
| raise NotImplementedError |
|
|
|
|
| @dataclass(frozen=True) |
| class LocalArtifactStoreConfig: |
| root_dir: Path |
|
|
|
|
| class LocalArtifactStore: |
| def __init__(self, cfg: LocalArtifactStoreConfig) -> None: |
| self._root = Path(cfg.root_dir).expanduser().resolve() |
| self._root.mkdir(parents=True, exist_ok=True) |
|
|
| def _path_for_digest(self, digest_hex: str, ext: str = "") -> Path: |
| rel = _default_layout(digest_hex) |
| if ext: |
| ext = ext if ext.startswith(".") else f".{ext}" |
| return self._root / (rel + ext) |
|
|
| def _to_uri(self, path: Path) -> str: |
| return path.resolve().as_uri() |
|
|
| def put_bytes(self, data: bytes, *, ext: str = "") -> str: |
| digest = _sha256_bytes(data) |
| path = self._path_for_digest(digest, ext=ext) |
| path.parent.mkdir(parents=True, exist_ok=True) |
| if not path.exists(): |
| tmp = path.with_suffix(path.suffix + ".tmp") |
| tmp.write_bytes(data) |
| os.replace(tmp, path) |
| return self._to_uri(path) |
|
|
| def put_json(self, obj: Any) -> str: |
| data = json.dumps(obj, indent=2, sort_keys=True, default=str).encode("utf-8") |
| return self.put_bytes(data, ext=".json") |
|
|
| def put_file(self, src: Path, *, content_addressed: bool = True) -> str: |
| src = Path(src) |
| if not src.exists(): |
| raise FileNotFoundError(str(src)) |
|
|
| if content_addressed: |
| data = src.read_bytes() |
| ext = "".join(src.suffixes) or "" |
| return self.put_bytes(data, ext=ext) |
|
|
| |
| dst = (self._root / "by_name" / src.name).resolve() |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| if dst.exists(): |
| return self._to_uri(dst) |
| tmp_dir = Path(tempfile.mkdtemp(prefix="ylff_artifact_")) |
| try: |
| tmp = tmp_dir / src.name |
| shutil.copy2(src, tmp) |
| os.replace(tmp, dst) |
| finally: |
| shutil.rmtree(tmp_dir, ignore_errors=True) |
| return self._to_uri(dst) |
|
|
| def exists(self, uri: str) -> bool: |
| if not uri.startswith("file://"): |
| return False |
| try: |
| path = Path(uri.replace("file://", "", 1)) |
| except Exception: |
| return False |
| return path.exists() |
|
|
| def materialize(self, uri: str, dst: Path) -> Path: |
| if not uri.startswith("file://"): |
| raise ValueError(f"LocalArtifactStore can only materialize file:// URIs, got: {uri}") |
| src = Path(uri.replace("file://", "", 1)) |
| dst = Path(dst) |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| if src.is_dir(): |
| if dst.exists(): |
| return dst |
| shutil.copytree(src, dst) |
| else: |
| if dst.exists(): |
| return dst |
| shutil.copy2(src, dst) |
| return dst |
|
|
|
|
| @dataclass(frozen=True) |
| class S3ArtifactStoreConfig: |
| bucket: str |
| prefix: str = "ylff/artifacts" |
| region: Optional[str] = None |
| endpoint_url: Optional[str] = None |
|
|
|
|
| class S3ArtifactStore: |
| """ |
| Optional S3 backend. Requires boto3. |
| """ |
|
|
| def __init__(self, cfg: S3ArtifactStoreConfig) -> None: |
| try: |
| import boto3 |
| except Exception as e: |
| raise ImportError( |
| "S3ArtifactStore requires the optional 'boto3' package. " |
| "Install with: pip install boto3" |
| ) from e |
|
|
| self._cfg = cfg |
| session = boto3.session.Session(region_name=cfg.region) |
| self._s3 = session.client("s3", endpoint_url=cfg.endpoint_url) |
|
|
| def _key_for_digest(self, digest_hex: str, ext: str = "") -> str: |
| rel = _default_layout(digest_hex) |
| if ext: |
| ext = ext if ext.startswith(".") else f".{ext}" |
| prefix = (self._cfg.prefix or "").strip("/") |
| return f"{prefix}/{rel}{ext}" if prefix else f"{rel}{ext}" |
|
|
| def _uri(self, key: str) -> str: |
| return f"s3://{self._cfg.bucket}/{key}" |
|
|
| def put_bytes(self, data: bytes, *, ext: str = "") -> str: |
| digest = _sha256_bytes(data) |
| key = self._key_for_digest(digest, ext=ext) |
| |
| self._s3.put_object(Bucket=self._cfg.bucket, Key=key, Body=data) |
| return self._uri(key) |
|
|
| def put_json(self, obj: Any) -> str: |
| data = json.dumps(obj, indent=2, sort_keys=True, default=str).encode("utf-8") |
| return self.put_bytes(data, ext=".json") |
|
|
| def put_file(self, src: Path, *, content_addressed: bool = True) -> str: |
| src = Path(src) |
| if not src.exists(): |
| raise FileNotFoundError(str(src)) |
| if content_addressed: |
| data = src.read_bytes() |
| ext = "".join(src.suffixes) or "" |
| return self.put_bytes(data, ext=ext) |
|
|
| key = f"{(self._cfg.prefix or '').strip('/')}/by_name/{src.name}".strip("/") |
| self._s3.upload_file(str(src), self._cfg.bucket, key) |
| return self._uri(key) |
|
|
| def exists(self, uri: str) -> bool: |
| if not uri.startswith("s3://"): |
| return False |
| bucket, key = _parse_s3_uri(uri) |
| try: |
| self._s3.head_object(Bucket=bucket, Key=key) |
| return True |
| except Exception: |
| return False |
|
|
| def materialize(self, uri: str, dst: Path) -> Path: |
| if not uri.startswith("s3://"): |
| raise ValueError(f"S3ArtifactStore can only materialize s3:// URIs, got: {uri}") |
| bucket, key = _parse_s3_uri(uri) |
| dst = Path(dst) |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| self._s3.download_file(bucket, key, str(dst)) |
| return dst |
|
|
|
|
| def _parse_s3_uri(uri: str) -> Tuple[str, str]: |
| |
| s = uri[len("s3://") :] |
| parts = s.split("/", 1) |
| if len(parts) != 2 or not parts[0] or not parts[1]: |
| raise ValueError(f"Invalid s3 uri: {uri}") |
| return parts[0], parts[1] |
|
|
|
|
| def build_artifact_store( |
| *, |
| backend: str = "local", |
| root_dir: Optional[Path] = None, |
| s3_bucket: Optional[str] = None, |
| s3_prefix: str = "ylff/artifacts", |
| s3_region: Optional[str] = None, |
| s3_endpoint_url: Optional[str] = None, |
| ) -> ArtifactStore: |
| backend = (backend or "local").lower().strip() |
| if backend in {"local", "fs", "filesystem", "file"}: |
| root = Path(root_dir or Path("data/artifacts")) |
| return LocalArtifactStore(LocalArtifactStoreConfig(root_dir=root)) |
| if backend in {"s3"}: |
| if not s3_bucket: |
| raise ValueError("s3_bucket is required when backend='s3'") |
| return S3ArtifactStore( |
| S3ArtifactStoreConfig( |
| bucket=s3_bucket, |
| prefix=s3_prefix, |
| region=s3_region, |
| endpoint_url=s3_endpoint_url, |
| ) |
| ) |
| raise ValueError(f"Unknown artifact store backend: {backend}") |
|
|
|
|
| _default_store: ArtifactStore = LocalArtifactStore( |
| LocalArtifactStoreConfig(root_dir=Path("data/artifacts")) |
| ) |
|
|
|
|
| def get_artifact_store(app: Any) -> ArtifactStore: |
| """ |
| Retrieve the artifact store from a FastAPI app, or fall back to a local store. |
| """ |
|
|
| store = getattr(getattr(app, "state", None), "artifact_store", None) |
| return store if store is not None else _default_store |
|
|