""" Local NVMe cache for S3 URIs. The orchestrator uses this to avoid re-downloading capture bundles across retries or multi-stage pipelines. """ from __future__ import annotations import hashlib import json import os import time from dataclasses import dataclass from pathlib import Path from typing import Optional, Tuple from .orchestration.s3_io import detect_external_tools, sync_s3_prefix_to_dir def _parse_s3_uri(uri: str) -> Tuple[str, str]: if not uri.startswith("s3://"): raise ValueError(f"Not an s3 uri: {uri}") s = uri[len("s3://") :] parts = s.split("/", 1) if len(parts) != 2 or not parts[0] or not parts[1]: raise ValueError(f"Invalid s3 uri: {uri}") return parts[0], parts[1] def _sha1(s: str) -> str: h = hashlib.sha1() h.update(s.encode("utf-8")) return h.hexdigest() def _acquire_lock(lock_path: Path, *, timeout_s: float = 1800.0) -> None: start = time.time() while True: try: fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR) os.close(fd) return except FileExistsError: if time.time() - start > timeout_s: raise TimeoutError(f"Timeout waiting for lock: {lock_path}") time.sleep(0.2) def _release_lock(lock_path: Path) -> None: try: lock_path.unlink(missing_ok=True) except Exception: pass @dataclass(frozen=True) class NvmeCacheConfig: root_dir: Path s3_region: Optional[str] = None s3_endpoint_url: Optional[str] = None prefer_external_sync: bool = True class NvmeCache: def __init__(self, cfg: NvmeCacheConfig) -> None: self._cfg = cfg self._root = Path(cfg.root_dir).expanduser().resolve() self._root.mkdir(parents=True, exist_ok=True) def _s3(self): try: import boto3 # type: ignore except Exception as e: # pragma: no cover raise ImportError( "NvmeCache S3 support requires boto3. Install with: pip install boto3" ) from e session = boto3.session.Session(region_name=self._cfg.s3_region) return session.client("s3", endpoint_url=self._cfg.s3_endpoint_url) def materialize_file(self, uri: str) -> Path: """ Materialize a single file URI into the cache and return the local path. """ if uri.startswith("file://"): return Path(uri.replace("file://", "", 1)) if not uri.startswith("s3://"): return Path(uri) bucket, key = _parse_s3_uri(uri) digest = _sha1(uri) out_dir = self._root / digest[:2] / digest out_dir.mkdir(parents=True, exist_ok=True) out_path = out_dir / Path(key).name meta_path = out_dir / "meta.json" lock_path = out_dir / "download.lock" if out_path.exists() and meta_path.exists(): return out_path _acquire_lock(lock_path) try: if out_path.exists() and meta_path.exists(): return out_path s3 = self._s3() head = s3.head_object(Bucket=bucket, Key=key) s3.download_file(bucket, key, str(out_path)) meta_path.write_text( json.dumps( { "uri": uri, "bucket": bucket, "key": key, "etag": head.get("ETag"), "size": int(head.get("ContentLength", 0) or 0), "downloaded_at_unix_s": time.time(), }, indent=2, ) ) return out_path finally: _release_lock(lock_path) def materialize_s3_prefix(self, *, bucket: str, prefix: str) -> Path: """ Mirror an S3 prefix into the cache and return the local directory path. This is intended for capture bundles where we want the full directory tree. """ pref = (prefix or "").lstrip("/") cache_key = f"s3://{bucket}/{pref}" digest = _sha1(cache_key) out_dir = (self._root / digest[:2] / digest / "prefix").resolve() meta_path = out_dir / "meta.json" lock_path = out_dir / "download.lock" out_dir.mkdir(parents=True, exist_ok=True) if meta_path.exists(): return out_dir _acquire_lock(lock_path) try: if meta_path.exists(): return out_dir num_files = 0 num_bytes = 0 # Prefer external sync tools (s5cmd/aws) for high throughput. if bool(self._cfg.prefer_external_sync): tools = detect_external_tools() if tools.s5cmd or tools.aws: sync_s3_prefix_to_dir(bucket=bucket, prefix=pref, dst_dir=out_dir, tools=tools) # We don't have exact counts cheaply; record unknown sentinel. num_files = -1 num_bytes = -1 else: # Fall back to boto3 loop. s3 = self._s3() paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=pref): for obj in page.get("Contents", []) or []: key = str(obj.get("Key", "")) if not key or key.endswith("/"): continue rel = key[len(pref) :].lstrip("/") dst = out_dir / rel dst.parent.mkdir(parents=True, exist_ok=True) size = int(obj.get("Size", 0) or 0) if dst.exists() and dst.stat().st_size == size: num_files += 1 num_bytes += size continue s3.download_file(bucket, key, str(dst)) num_files += 1 num_bytes += size else: s3 = self._s3() paginator = s3.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=bucket, Prefix=pref): for obj in page.get("Contents", []) or []: key = str(obj.get("Key", "")) if not key or key.endswith("/"): continue rel = key[len(pref) :].lstrip("/") dst = out_dir / rel dst.parent.mkdir(parents=True, exist_ok=True) size = int(obj.get("Size", 0) or 0) if dst.exists() and dst.stat().st_size == size: num_files += 1 num_bytes += size continue s3.download_file(bucket, key, str(dst)) num_files += 1 num_bytes += size meta_path.write_text( json.dumps( { "bucket": bucket, "prefix": pref, "cache_key": cache_key, "num_files": int(num_files), "num_bytes": int(num_bytes), "downloaded_at_unix_s": time.time(), }, indent=2, ) ) return out_dir finally: _release_lock(lock_path)