3d_model / ylff /services /nvme_cache.py
Azan
Clean deployment build (Squashed)
7a87926
"""
Local NVMe cache for S3 URIs.
The orchestrator uses this to avoid re-downloading capture bundles across retries
or multi-stage pipelines.
"""
from __future__ import annotations
import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from .orchestration.s3_io import detect_external_tools, sync_s3_prefix_to_dir
def _parse_s3_uri(uri: str) -> Tuple[str, str]:
if not uri.startswith("s3://"):
raise ValueError(f"Not an s3 uri: {uri}")
s = uri[len("s3://") :]
parts = s.split("/", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
raise ValueError(f"Invalid s3 uri: {uri}")
return parts[0], parts[1]
def _sha1(s: str) -> str:
h = hashlib.sha1()
h.update(s.encode("utf-8"))
return h.hexdigest()
def _acquire_lock(lock_path: Path, *, timeout_s: float = 1800.0) -> None:
start = time.time()
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.close(fd)
return
except FileExistsError:
if time.time() - start > timeout_s:
raise TimeoutError(f"Timeout waiting for lock: {lock_path}")
time.sleep(0.2)
def _release_lock(lock_path: Path) -> None:
try:
lock_path.unlink(missing_ok=True)
except Exception:
pass
@dataclass(frozen=True)
class NvmeCacheConfig:
root_dir: Path
s3_region: Optional[str] = None
s3_endpoint_url: Optional[str] = None
prefer_external_sync: bool = True
class NvmeCache:
def __init__(self, cfg: NvmeCacheConfig) -> None:
self._cfg = cfg
self._root = Path(cfg.root_dir).expanduser().resolve()
self._root.mkdir(parents=True, exist_ok=True)
def _s3(self):
try:
import boto3 # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"NvmeCache S3 support requires boto3. Install with: pip install boto3"
) from e
session = boto3.session.Session(region_name=self._cfg.s3_region)
return session.client("s3", endpoint_url=self._cfg.s3_endpoint_url)
def materialize_file(self, uri: str) -> Path:
"""
Materialize a single file URI into the cache and return the local path.
"""
if uri.startswith("file://"):
return Path(uri.replace("file://", "", 1))
if not uri.startswith("s3://"):
return Path(uri)
bucket, key = _parse_s3_uri(uri)
digest = _sha1(uri)
out_dir = self._root / digest[:2] / digest
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / Path(key).name
meta_path = out_dir / "meta.json"
lock_path = out_dir / "download.lock"
if out_path.exists() and meta_path.exists():
return out_path
_acquire_lock(lock_path)
try:
if out_path.exists() and meta_path.exists():
return out_path
s3 = self._s3()
head = s3.head_object(Bucket=bucket, Key=key)
s3.download_file(bucket, key, str(out_path))
meta_path.write_text(
json.dumps(
{
"uri": uri,
"bucket": bucket,
"key": key,
"etag": head.get("ETag"),
"size": int(head.get("ContentLength", 0) or 0),
"downloaded_at_unix_s": time.time(),
},
indent=2,
)
)
return out_path
finally:
_release_lock(lock_path)
def materialize_s3_prefix(self, *, bucket: str, prefix: str) -> Path:
"""
Mirror an S3 prefix into the cache and return the local directory path.
This is intended for capture bundles where we want the full directory tree.
"""
pref = (prefix or "").lstrip("/")
cache_key = f"s3://{bucket}/{pref}"
digest = _sha1(cache_key)
out_dir = (self._root / digest[:2] / digest / "prefix").resolve()
meta_path = out_dir / "meta.json"
lock_path = out_dir / "download.lock"
out_dir.mkdir(parents=True, exist_ok=True)
if meta_path.exists():
return out_dir
_acquire_lock(lock_path)
try:
if meta_path.exists():
return out_dir
num_files = 0
num_bytes = 0
# Prefer external sync tools (s5cmd/aws) for high throughput.
if bool(self._cfg.prefer_external_sync):
tools = detect_external_tools()
if tools.s5cmd or tools.aws:
sync_s3_prefix_to_dir(bucket=bucket, prefix=pref, dst_dir=out_dir, tools=tools)
# We don't have exact counts cheaply; record unknown sentinel.
num_files = -1
num_bytes = -1
else:
# Fall back to boto3 loop.
s3 = self._s3()
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=pref):
for obj in page.get("Contents", []) or []:
key = str(obj.get("Key", ""))
if not key or key.endswith("/"):
continue
rel = key[len(pref) :].lstrip("/")
dst = out_dir / rel
dst.parent.mkdir(parents=True, exist_ok=True)
size = int(obj.get("Size", 0) or 0)
if dst.exists() and dst.stat().st_size == size:
num_files += 1
num_bytes += size
continue
s3.download_file(bucket, key, str(dst))
num_files += 1
num_bytes += size
else:
s3 = self._s3()
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=pref):
for obj in page.get("Contents", []) or []:
key = str(obj.get("Key", ""))
if not key or key.endswith("/"):
continue
rel = key[len(pref) :].lstrip("/")
dst = out_dir / rel
dst.parent.mkdir(parents=True, exist_ok=True)
size = int(obj.get("Size", 0) or 0)
if dst.exists() and dst.stat().st_size == size:
num_files += 1
num_bytes += size
continue
s3.download_file(bucket, key, str(dst))
num_files += 1
num_bytes += size
meta_path.write_text(
json.dumps(
{
"bucket": bucket,
"prefix": pref,
"cache_key": cache_key,
"num_files": int(num_files),
"num_bytes": int(num_bytes),
"downloaded_at_unix_s": time.time(),
},
indent=2,
)
)
return out_dir
finally:
_release_lock(lock_path)