File size: 7,672 Bytes
7a87926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Local NVMe cache for S3 URIs.

The orchestrator uses this to avoid re-downloading capture bundles across retries
or multi-stage pipelines.
"""

from __future__ import annotations

import hashlib
import json
import os
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple

from .orchestration.s3_io import detect_external_tools, sync_s3_prefix_to_dir


def _parse_s3_uri(uri: str) -> Tuple[str, str]:
    if not uri.startswith("s3://"):
        raise ValueError(f"Not an s3 uri: {uri}")
    s = uri[len("s3://") :]
    parts = s.split("/", 1)
    if len(parts) != 2 or not parts[0] or not parts[1]:
        raise ValueError(f"Invalid s3 uri: {uri}")
    return parts[0], parts[1]


def _sha1(s: str) -> str:
    h = hashlib.sha1()
    h.update(s.encode("utf-8"))
    return h.hexdigest()


def _acquire_lock(lock_path: Path, *, timeout_s: float = 1800.0) -> None:
    start = time.time()
    while True:
        try:
            fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_RDWR)
            os.close(fd)
            return
        except FileExistsError:
            if time.time() - start > timeout_s:
                raise TimeoutError(f"Timeout waiting for lock: {lock_path}")
            time.sleep(0.2)


def _release_lock(lock_path: Path) -> None:
    try:
        lock_path.unlink(missing_ok=True)
    except Exception:
        pass


@dataclass(frozen=True)
class NvmeCacheConfig:
    root_dir: Path
    s3_region: Optional[str] = None
    s3_endpoint_url: Optional[str] = None
    prefer_external_sync: bool = True


class NvmeCache:
    def __init__(self, cfg: NvmeCacheConfig) -> None:
        self._cfg = cfg
        self._root = Path(cfg.root_dir).expanduser().resolve()
        self._root.mkdir(parents=True, exist_ok=True)

    def _s3(self):
        try:
            import boto3  # type: ignore
        except Exception as e:  # pragma: no cover
            raise ImportError(
                "NvmeCache S3 support requires boto3. Install with: pip install boto3"
            ) from e
        session = boto3.session.Session(region_name=self._cfg.s3_region)
        return session.client("s3", endpoint_url=self._cfg.s3_endpoint_url)

    def materialize_file(self, uri: str) -> Path:
        """
        Materialize a single file URI into the cache and return the local path.
        """
        if uri.startswith("file://"):
            return Path(uri.replace("file://", "", 1))
        if not uri.startswith("s3://"):
            return Path(uri)

        bucket, key = _parse_s3_uri(uri)
        digest = _sha1(uri)
        out_dir = self._root / digest[:2] / digest
        out_dir.mkdir(parents=True, exist_ok=True)
        out_path = out_dir / Path(key).name
        meta_path = out_dir / "meta.json"
        lock_path = out_dir / "download.lock"

        if out_path.exists() and meta_path.exists():
            return out_path

        _acquire_lock(lock_path)
        try:
            if out_path.exists() and meta_path.exists():
                return out_path
            s3 = self._s3()
            head = s3.head_object(Bucket=bucket, Key=key)
            s3.download_file(bucket, key, str(out_path))
            meta_path.write_text(
                json.dumps(
                    {
                        "uri": uri,
                        "bucket": bucket,
                        "key": key,
                        "etag": head.get("ETag"),
                        "size": int(head.get("ContentLength", 0) or 0),
                        "downloaded_at_unix_s": time.time(),
                    },
                    indent=2,
                )
            )
            return out_path
        finally:
            _release_lock(lock_path)

    def materialize_s3_prefix(self, *, bucket: str, prefix: str) -> Path:
        """
        Mirror an S3 prefix into the cache and return the local directory path.

        This is intended for capture bundles where we want the full directory tree.
        """
        pref = (prefix or "").lstrip("/")
        cache_key = f"s3://{bucket}/{pref}"
        digest = _sha1(cache_key)
        out_dir = (self._root / digest[:2] / digest / "prefix").resolve()
        meta_path = out_dir / "meta.json"
        lock_path = out_dir / "download.lock"
        out_dir.mkdir(parents=True, exist_ok=True)

        if meta_path.exists():
            return out_dir

        _acquire_lock(lock_path)
        try:
            if meta_path.exists():
                return out_dir

            num_files = 0
            num_bytes = 0

            # Prefer external sync tools (s5cmd/aws) for high throughput.
            if bool(self._cfg.prefer_external_sync):
                tools = detect_external_tools()
                if tools.s5cmd or tools.aws:
                    sync_s3_prefix_to_dir(bucket=bucket, prefix=pref, dst_dir=out_dir, tools=tools)
                    # We don't have exact counts cheaply; record unknown sentinel.
                    num_files = -1
                    num_bytes = -1
                else:
                    # Fall back to boto3 loop.
                    s3 = self._s3()
                    paginator = s3.get_paginator("list_objects_v2")
                    for page in paginator.paginate(Bucket=bucket, Prefix=pref):
                        for obj in page.get("Contents", []) or []:
                            key = str(obj.get("Key", ""))
                            if not key or key.endswith("/"):
                                continue
                            rel = key[len(pref) :].lstrip("/")
                            dst = out_dir / rel
                            dst.parent.mkdir(parents=True, exist_ok=True)
                            size = int(obj.get("Size", 0) or 0)
                            if dst.exists() and dst.stat().st_size == size:
                                num_files += 1
                                num_bytes += size
                                continue
                            s3.download_file(bucket, key, str(dst))
                            num_files += 1
                            num_bytes += size
            else:
                s3 = self._s3()
                paginator = s3.get_paginator("list_objects_v2")
                for page in paginator.paginate(Bucket=bucket, Prefix=pref):
                    for obj in page.get("Contents", []) or []:
                        key = str(obj.get("Key", ""))
                        if not key or key.endswith("/"):
                            continue
                        rel = key[len(pref) :].lstrip("/")
                        dst = out_dir / rel
                        dst.parent.mkdir(parents=True, exist_ok=True)
                        size = int(obj.get("Size", 0) or 0)
                        if dst.exists() and dst.stat().st_size == size:
                            num_files += 1
                            num_bytes += size
                            continue
                        s3.download_file(bucket, key, str(dst))
                        num_files += 1
                        num_bytes += size

            meta_path.write_text(
                json.dumps(
                    {
                        "bucket": bucket,
                        "prefix": pref,
                        "cache_key": cache_key,
                        "num_files": int(num_files),
                        "num_bytes": int(num_bytes),
                        "downloaded_at_unix_s": time.time(),
                    },
                    indent=2,
                )
            )
            return out_dir
        finally:
            _release_lock(lock_path)