3d_model / ylff /utils /artifact_store.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Artifact storage abstraction.
The plan calls for a content-addressed artifact store so expensive intermediates
can be cached/reused across teacher/audit/training runs and moved between local
and remote environments (e.g., RunPod).
Design goals:
- Local filesystem backend is always available (no extra deps).
- S3 backend is optional (requires boto3).
- Content addressed by SHA-256 of bytes for deterministic paths.
"""
from __future__ import annotations
import hashlib
import json
import os
import shutil
import tempfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional, Protocol, Tuple
def _sha256_bytes(data: bytes) -> str:
h = hashlib.sha256()
h.update(data)
return h.hexdigest()
def _default_layout(digest_hex: str) -> str:
# Avoid huge single directories; common CAS layout.
return f"{digest_hex[:2]}/{digest_hex}"
class ArtifactStore(Protocol):
"""
Minimal interface used by pipelines.
Implementations should return an artifact URI string, which can be either:
- local: "file:///abs/path/to/artifact"
- s3: "s3://bucket/prefix/..../digest"
"""
def put_bytes(self, data: bytes, *, ext: str = "") -> str:
raise NotImplementedError
def put_json(self, obj: Any) -> str:
raise NotImplementedError
def put_file(self, src: Path, *, content_addressed: bool = True) -> str:
raise NotImplementedError
def exists(self, uri: str) -> bool:
raise NotImplementedError
def materialize(self, uri: str, dst: Path) -> Path:
raise NotImplementedError
@dataclass(frozen=True)
class LocalArtifactStoreConfig:
root_dir: Path
class LocalArtifactStore:
def __init__(self, cfg: LocalArtifactStoreConfig) -> None:
self._root = Path(cfg.root_dir).expanduser().resolve()
self._root.mkdir(parents=True, exist_ok=True)
def _path_for_digest(self, digest_hex: str, ext: str = "") -> Path:
rel = _default_layout(digest_hex)
if ext:
ext = ext if ext.startswith(".") else f".{ext}"
return self._root / (rel + ext)
def _to_uri(self, path: Path) -> str:
return path.resolve().as_uri()
def put_bytes(self, data: bytes, *, ext: str = "") -> str:
digest = _sha256_bytes(data)
path = self._path_for_digest(digest, ext=ext)
path.parent.mkdir(parents=True, exist_ok=True)
if not path.exists():
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_bytes(data)
os.replace(tmp, path)
return self._to_uri(path)
def put_json(self, obj: Any) -> str:
data = json.dumps(obj, indent=2, sort_keys=True, default=str).encode("utf-8")
return self.put_bytes(data, ext=".json")
def put_file(self, src: Path, *, content_addressed: bool = True) -> str:
src = Path(src)
if not src.exists():
raise FileNotFoundError(str(src))
if content_addressed:
data = src.read_bytes()
ext = "".join(src.suffixes) or ""
return self.put_bytes(data, ext=ext)
# Non-content-addressed: copy under root preserving name (still isolated).
dst = (self._root / "by_name" / src.name).resolve()
dst.parent.mkdir(parents=True, exist_ok=True)
if dst.exists():
return self._to_uri(dst)
tmp_dir = Path(tempfile.mkdtemp(prefix="ylff_artifact_"))
try:
tmp = tmp_dir / src.name
shutil.copy2(src, tmp)
os.replace(tmp, dst)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
return self._to_uri(dst)
def exists(self, uri: str) -> bool:
if not uri.startswith("file://"):
return False
try:
path = Path(uri.replace("file://", "", 1))
except Exception:
return False
return path.exists()
def materialize(self, uri: str, dst: Path) -> Path:
if not uri.startswith("file://"):
raise ValueError(f"LocalArtifactStore can only materialize file:// URIs, got: {uri}")
src = Path(uri.replace("file://", "", 1))
dst = Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
if src.is_dir():
if dst.exists():
return dst
shutil.copytree(src, dst)
else:
if dst.exists():
return dst
shutil.copy2(src, dst)
return dst
@dataclass(frozen=True)
class S3ArtifactStoreConfig:
bucket: str
prefix: str = "ylff/artifacts"
region: Optional[str] = None
endpoint_url: Optional[str] = None # for S3-compatible stores
class S3ArtifactStore:
"""
Optional S3 backend. Requires boto3.
"""
def __init__(self, cfg: S3ArtifactStoreConfig) -> None:
try:
import boto3 # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"S3ArtifactStore requires the optional 'boto3' package. "
"Install with: pip install boto3"
) from e
self._cfg = cfg
session = boto3.session.Session(region_name=cfg.region)
self._s3 = session.client("s3", endpoint_url=cfg.endpoint_url)
def _key_for_digest(self, digest_hex: str, ext: str = "") -> str:
rel = _default_layout(digest_hex)
if ext:
ext = ext if ext.startswith(".") else f".{ext}"
prefix = (self._cfg.prefix or "").strip("/")
return f"{prefix}/{rel}{ext}" if prefix else f"{rel}{ext}"
def _uri(self, key: str) -> str:
return f"s3://{self._cfg.bucket}/{key}"
def put_bytes(self, data: bytes, *, ext: str = "") -> str:
digest = _sha256_bytes(data)
key = self._key_for_digest(digest, ext=ext)
# Upload is idempotent for same content; overwrite is fine.
self._s3.put_object(Bucket=self._cfg.bucket, Key=key, Body=data)
return self._uri(key)
def put_json(self, obj: Any) -> str:
data = json.dumps(obj, indent=2, sort_keys=True, default=str).encode("utf-8")
return self.put_bytes(data, ext=".json")
def put_file(self, src: Path, *, content_addressed: bool = True) -> str:
src = Path(src)
if not src.exists():
raise FileNotFoundError(str(src))
if content_addressed:
data = src.read_bytes()
ext = "".join(src.suffixes) or ""
return self.put_bytes(data, ext=ext)
key = f"{(self._cfg.prefix or '').strip('/')}/by_name/{src.name}".strip("/")
self._s3.upload_file(str(src), self._cfg.bucket, key)
return self._uri(key)
def exists(self, uri: str) -> bool:
if not uri.startswith("s3://"):
return False
bucket, key = _parse_s3_uri(uri)
try:
self._s3.head_object(Bucket=bucket, Key=key)
return True
except Exception:
return False
def materialize(self, uri: str, dst: Path) -> Path:
if not uri.startswith("s3://"):
raise ValueError(f"S3ArtifactStore can only materialize s3:// URIs, got: {uri}")
bucket, key = _parse_s3_uri(uri)
dst = Path(dst)
dst.parent.mkdir(parents=True, exist_ok=True)
self._s3.download_file(bucket, key, str(dst))
return dst
def _parse_s3_uri(uri: str) -> Tuple[str, str]:
# s3://bucket/key...
s = uri[len("s3://") :]
parts = s.split("/", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
raise ValueError(f"Invalid s3 uri: {uri}")
return parts[0], parts[1]
def build_artifact_store(
*,
backend: str = "local",
root_dir: Optional[Path] = None,
s3_bucket: Optional[str] = None,
s3_prefix: str = "ylff/artifacts",
s3_region: Optional[str] = None,
s3_endpoint_url: Optional[str] = None,
) -> ArtifactStore:
backend = (backend or "local").lower().strip()
if backend in {"local", "fs", "filesystem", "file"}:
root = Path(root_dir or Path("data/artifacts"))
return LocalArtifactStore(LocalArtifactStoreConfig(root_dir=root))
if backend in {"s3"}:
if not s3_bucket:
raise ValueError("s3_bucket is required when backend='s3'")
return S3ArtifactStore(
S3ArtifactStoreConfig(
bucket=s3_bucket,
prefix=s3_prefix,
region=s3_region,
endpoint_url=s3_endpoint_url,
)
)
raise ValueError(f"Unknown artifact store backend: {backend}")
_default_store: ArtifactStore = LocalArtifactStore(
LocalArtifactStoreConfig(root_dir=Path("data/artifacts"))
)
def get_artifact_store(app: Any) -> ArtifactStore:
"""
Retrieve the artifact store from a FastAPI app, or fall back to a local store.
"""
store = getattr(getattr(app, "state", None), "artifact_store", None)
return store if store is not None else _default_store