| | """ |
| | Local NVMe cache for S3 URIs. |
| | |
| | The orchestrator uses this to avoid re-downloading capture bundles across retries |
| | or multi-stage pipelines. |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import hashlib |
| | import json |
| | import os |
| | import time |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Optional, Tuple |
| |
|
| | from .orchestration.s3_io import detect_external_tools, sync_s3_prefix_to_dir |
| |
|
| |
|
| | def _parse_s3_uri(uri: str) -> Tuple[str, str]: |
| | if not uri.startswith("s3://"): |
| | raise ValueError(f"Not an s3 uri: {uri}") |
| | s = uri[len("s3://") :] |
| | parts = s.split("/", 1) |
| | if len(parts) != 2 or not parts[0] or not parts[1]: |
| | raise ValueError(f"Invalid s3 uri: {uri}") |
| | return parts[0], parts[1] |
| |
|
| |
|
| | def _sha1(s: str) -> str: |
| | h = hashlib.sha1() |
| | h.update(s.encode("utf-8")) |
| | return h.hexdigest() |
| |
|
| |
|
| | def _acquire_lock(lock_path: Path, *, timeout_s: float = 1800.0) -> None: |
| | start = time.time() |
| | while True: |
| | try: |
| | fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR) |
| | os.close(fd) |
| | return |
| | except FileExistsError: |
| | if time.time() - start > timeout_s: |
| | raise TimeoutError(f"Timeout waiting for lock: {lock_path}") |
| | time.sleep(0.2) |
| |
|
| |
|
| | def _release_lock(lock_path: Path) -> None: |
| | try: |
| | lock_path.unlink(missing_ok=True) |
| | except Exception: |
| | pass |
| |
|
| |
|
| | @dataclass(frozen=True) |
| | class NvmeCacheConfig: |
| | root_dir: Path |
| | s3_region: Optional[str] = None |
| | s3_endpoint_url: Optional[str] = None |
| | prefer_external_sync: bool = True |
| |
|
| |
|
| | class NvmeCache: |
| | def __init__(self, cfg: NvmeCacheConfig) -> None: |
| | self._cfg = cfg |
| | self._root = Path(cfg.root_dir).expanduser().resolve() |
| | self._root.mkdir(parents=True, exist_ok=True) |
| |
|
| | def _s3(self): |
| | try: |
| | import boto3 |
| | except Exception as e: |
| | raise ImportError( |
| | "NvmeCache S3 support requires boto3. Install with: pip install boto3" |
| | ) from e |
| | session = boto3.session.Session(region_name=self._cfg.s3_region) |
| | return session.client("s3", endpoint_url=self._cfg.s3_endpoint_url) |
| |
|
| | def materialize_file(self, uri: str) -> Path: |
| | """ |
| | Materialize a single file URI into the cache and return the local path. |
| | """ |
| | if uri.startswith("file://"): |
| | return Path(uri.replace("file://", "", 1)) |
| | if not uri.startswith("s3://"): |
| | return Path(uri) |
| |
|
| | bucket, key = _parse_s3_uri(uri) |
| | digest = _sha1(uri) |
| | out_dir = self._root / digest[:2] / digest |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| | out_path = out_dir / Path(key).name |
| | meta_path = out_dir / "meta.json" |
| | lock_path = out_dir / "download.lock" |
| |
|
| | if out_path.exists() and meta_path.exists(): |
| | return out_path |
| |
|
| | _acquire_lock(lock_path) |
| | try: |
| | if out_path.exists() and meta_path.exists(): |
| | return out_path |
| | s3 = self._s3() |
| | head = s3.head_object(Bucket=bucket, Key=key) |
| | s3.download_file(bucket, key, str(out_path)) |
| | meta_path.write_text( |
| | json.dumps( |
| | { |
| | "uri": uri, |
| | "bucket": bucket, |
| | "key": key, |
| | "etag": head.get("ETag"), |
| | "size": int(head.get("ContentLength", 0) or 0), |
| | "downloaded_at_unix_s": time.time(), |
| | }, |
| | indent=2, |
| | ) |
| | ) |
| | return out_path |
| | finally: |
| | _release_lock(lock_path) |
| |
|
| | def materialize_s3_prefix(self, *, bucket: str, prefix: str) -> Path: |
| | """ |
| | Mirror an S3 prefix into the cache and return the local directory path. |
| | |
| | This is intended for capture bundles where we want the full directory tree. |
| | """ |
| | pref = (prefix or "").lstrip("/") |
| | cache_key = f"s3://{bucket}/{pref}" |
| | digest = _sha1(cache_key) |
| | out_dir = (self._root / digest[:2] / digest / "prefix").resolve() |
| | meta_path = out_dir / "meta.json" |
| | lock_path = out_dir / "download.lock" |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | if meta_path.exists(): |
| | return out_dir |
| |
|
| | _acquire_lock(lock_path) |
| | try: |
| | if meta_path.exists(): |
| | return out_dir |
| |
|
| | num_files = 0 |
| | num_bytes = 0 |
| |
|
| | |
| | if bool(self._cfg.prefer_external_sync): |
| | tools = detect_external_tools() |
| | if tools.s5cmd or tools.aws: |
| | sync_s3_prefix_to_dir(bucket=bucket, prefix=pref, dst_dir=out_dir, tools=tools) |
| | |
| | num_files = -1 |
| | num_bytes = -1 |
| | else: |
| | |
| | s3 = self._s3() |
| | paginator = s3.get_paginator("list_objects_v2") |
| | for page in paginator.paginate(Bucket=bucket, Prefix=pref): |
| | for obj in page.get("Contents", []) or []: |
| | key = str(obj.get("Key", "")) |
| | if not key or key.endswith("/"): |
| | continue |
| | rel = key[len(pref) :].lstrip("/") |
| | dst = out_dir / rel |
| | dst.parent.mkdir(parents=True, exist_ok=True) |
| | size = int(obj.get("Size", 0) or 0) |
| | if dst.exists() and dst.stat().st_size == size: |
| | num_files += 1 |
| | num_bytes += size |
| | continue |
| | s3.download_file(bucket, key, str(dst)) |
| | num_files += 1 |
| | num_bytes += size |
| | else: |
| | s3 = self._s3() |
| | paginator = s3.get_paginator("list_objects_v2") |
| | for page in paginator.paginate(Bucket=bucket, Prefix=pref): |
| | for obj in page.get("Contents", []) or []: |
| | key = str(obj.get("Key", "")) |
| | if not key or key.endswith("/"): |
| | continue |
| | rel = key[len(pref) :].lstrip("/") |
| | dst = out_dir / rel |
| | dst.parent.mkdir(parents=True, exist_ok=True) |
| | size = int(obj.get("Size", 0) or 0) |
| | if dst.exists() and dst.stat().st_size == size: |
| | num_files += 1 |
| | num_bytes += size |
| | continue |
| | s3.download_file(bucket, key, str(dst)) |
| | num_files += 1 |
| | num_bytes += size |
| |
|
| | meta_path.write_text( |
| | json.dumps( |
| | { |
| | "bucket": bucket, |
| | "prefix": pref, |
| | "cache_key": cache_key, |
| | "num_files": int(num_files), |
| | "num_bytes": int(num_bytes), |
| | "downloaded_at_unix_s": time.time(), |
| | }, |
| | indent=2, |
| | ) |
| | ) |
| | return out_dir |
| | finally: |
| | _release_lock(lock_path) |
| |
|