File size: 7,672 Bytes
7a87926 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """
Local NVMe cache for S3 URIs.
The orchestrator uses this to avoid re-downloading capture bundles across retries
or multi-stage pipelines.
"""
from __future__ import annotations
import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
from .orchestration.s3_io import detect_external_tools, sync_s3_prefix_to_dir
def _parse_s3_uri(uri: str) -> Tuple[str, str]:
if not uri.startswith("s3://"):
raise ValueError(f"Not an s3 uri: {uri}")
s = uri[len("s3://") :]
parts = s.split("/", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
raise ValueError(f"Invalid s3 uri: {uri}")
return parts[0], parts[1]
def _sha1(s: str) -> str:
h = hashlib.sha1()
h.update(s.encode("utf-8"))
return h.hexdigest()
def _acquire_lock(lock_path: Path, *, timeout_s: float = 1800.0) -> None:
start = time.time()
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR)
os.close(fd)
return
except FileExistsError:
if time.time() - start > timeout_s:
raise TimeoutError(f"Timeout waiting for lock: {lock_path}")
time.sleep(0.2)
def _release_lock(lock_path: Path) -> None:
try:
lock_path.unlink(missing_ok=True)
except Exception:
pass
@dataclass(frozen=True)
class NvmeCacheConfig:
root_dir: Path
s3_region: Optional[str] = None
s3_endpoint_url: Optional[str] = None
prefer_external_sync: bool = True
class NvmeCache:
def __init__(self, cfg: NvmeCacheConfig) -> None:
self._cfg = cfg
self._root = Path(cfg.root_dir).expanduser().resolve()
self._root.mkdir(parents=True, exist_ok=True)
def _s3(self):
try:
import boto3 # type: ignore
except Exception as e: # pragma: no cover
raise ImportError(
"NvmeCache S3 support requires boto3. Install with: pip install boto3"
) from e
session = boto3.session.Session(region_name=self._cfg.s3_region)
return session.client("s3", endpoint_url=self._cfg.s3_endpoint_url)
def materialize_file(self, uri: str) -> Path:
"""
Materialize a single file URI into the cache and return the local path.
"""
if uri.startswith("file://"):
return Path(uri.replace("file://", "", 1))
if not uri.startswith("s3://"):
return Path(uri)
bucket, key = _parse_s3_uri(uri)
digest = _sha1(uri)
out_dir = self._root / digest[:2] / digest
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / Path(key).name
meta_path = out_dir / "meta.json"
lock_path = out_dir / "download.lock"
if out_path.exists() and meta_path.exists():
return out_path
_acquire_lock(lock_path)
try:
if out_path.exists() and meta_path.exists():
return out_path
s3 = self._s3()
head = s3.head_object(Bucket=bucket, Key=key)
s3.download_file(bucket, key, str(out_path))
meta_path.write_text(
json.dumps(
{
"uri": uri,
"bucket": bucket,
"key": key,
"etag": head.get("ETag"),
"size": int(head.get("ContentLength", 0) or 0),
"downloaded_at_unix_s": time.time(),
},
indent=2,
)
)
return out_path
finally:
_release_lock(lock_path)
def materialize_s3_prefix(self, *, bucket: str, prefix: str) -> Path:
"""
Mirror an S3 prefix into the cache and return the local directory path.
This is intended for capture bundles where we want the full directory tree.
"""
pref = (prefix or "").lstrip("/")
cache_key = f"s3://{bucket}/{pref}"
digest = _sha1(cache_key)
out_dir = (self._root / digest[:2] / digest / "prefix").resolve()
meta_path = out_dir / "meta.json"
lock_path = out_dir / "download.lock"
out_dir.mkdir(parents=True, exist_ok=True)
if meta_path.exists():
return out_dir
_acquire_lock(lock_path)
try:
if meta_path.exists():
return out_dir
num_files = 0
num_bytes = 0
# Prefer external sync tools (s5cmd/aws) for high throughput.
if bool(self._cfg.prefer_external_sync):
tools = detect_external_tools()
if tools.s5cmd or tools.aws:
sync_s3_prefix_to_dir(bucket=bucket, prefix=pref, dst_dir=out_dir, tools=tools)
# We don't have exact counts cheaply; record unknown sentinel.
num_files = -1
num_bytes = -1
else:
# Fall back to boto3 loop.
s3 = self._s3()
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=pref):
for obj in page.get("Contents", []) or []:
key = str(obj.get("Key", ""))
if not key or key.endswith("/"):
continue
rel = key[len(pref) :].lstrip("/")
dst = out_dir / rel
dst.parent.mkdir(parents=True, exist_ok=True)
size = int(obj.get("Size", 0) or 0)
if dst.exists() and dst.stat().st_size == size:
num_files += 1
num_bytes += size
continue
s3.download_file(bucket, key, str(dst))
num_files += 1
num_bytes += size
else:
s3 = self._s3()
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=pref):
for obj in page.get("Contents", []) or []:
key = str(obj.get("Key", ""))
if not key or key.endswith("/"):
continue
rel = key[len(pref) :].lstrip("/")
dst = out_dir / rel
dst.parent.mkdir(parents=True, exist_ok=True)
size = int(obj.get("Size", 0) or 0)
if dst.exists() and dst.stat().st_size == size:
num_files += 1
num_bytes += size
continue
s3.download_file(bucket, key, str(dst))
num_files += 1
num_bytes += size
meta_path.write_text(
json.dumps(
{
"bucket": bucket,
"prefix": pref,
"cache_key": cache_key,
"num_files": int(num_files),
"num_bytes": int(num_bytes),
"downloaded_at_unix_s": time.time(),
},
indent=2,
)
)
return out_dir
finally:
_release_lock(lock_path)
|