brian4dwell's picture
initi worker working
1c5aca1
"""Storage backends for persisting STream3R job artifacts."""
from __future__ import annotations
import json
import shutil
from pathlib import Path
from typing import Mapping
try: # Lazy import to keep optional dependency
import boto3
from botocore.client import Config as Boto3Config
from botocore.exceptions import BotoCoreError, ClientError
except ModuleNotFoundError: # pragma: no cover - fallback when boto3 missing
boto3 = None # type: ignore[assignment]
Boto3Config = None # type: ignore[assignment]
BotoCoreError = ClientError = Exception # type: ignore[assignment]
from .config import WorkerSettings
class StorageError(RuntimeError):
"""Raised when artifact persistence fails."""
class StorageClient:
"""Abstract base class providing a minimal upload interface."""
def __init__(self, settings: WorkerSettings):
self.settings = settings
# --- key builders -------------------------------------------------
def build_key(self, scene_id: str, *parts: str) -> str:
components = [str(self.settings.storage_prefix), str(scene_id), "stream3r"]
for part in parts:
if not part:
continue
components.append(str(part).strip("/"))
return "/".join(components)
def build_uri(self, key: str) -> str:
raise NotImplementedError
# --- upload primitives -------------------------------------------
def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str:
raise NotImplementedError
def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str:
raise NotImplementedError
def upload_json(self, payload: Mapping[str, object], key: str) -> str:
data = json.dumps(payload, allow_nan=False).encode("utf-8")
return self.upload_bytes(data, key, content_type="application/json")
def download_to_path(self, key: str, destination: Path) -> Path:
"""Download an object identified by *key* into *destination*."""
raise NotImplementedError
# --- helpers ------------------------------------------------------
def ensure_dir(self, scene_id: str, *parts: str) -> str:
"""Create a logical directory path under the storage prefix."""
key = self.build_key(scene_id, *parts)
if not key.endswith("/"):
key = f"{key}/"
return key
class S3StorageClient(StorageClient):
"""S3-compatible storage backend."""
def __init__(self, settings: WorkerSettings):
if boto3 is None: # pragma: no cover - guarded by optional dependency
raise StorageError("boto3 is required for S3 storage but is not installed")
super().__init__(settings)
session_kwargs: dict[str, object] = {}
if settings.s3_profile:
session_kwargs["profile_name"] = settings.s3_profile
if settings.s3_region:
session_kwargs["region_name"] = settings.s3_region
if settings.aws_access_key_id and settings.aws_secret_access_key:
session_kwargs["aws_access_key_id"] = settings.aws_access_key_id
session_kwargs["aws_secret_access_key"] = settings.aws_secret_access_key
if settings.aws_session_token:
session_kwargs["aws_session_token"] = settings.aws_session_token
session = boto3.session.Session(**session_kwargs)
config = None
if settings.s3_force_path_style and Boto3Config is not None:
config = Boto3Config(s3={"addressing_style": "path"})
self._client = session.client(
"s3",
endpoint_url=settings.s3_endpoint_url,
config=config,
)
if not settings.s3_bucket:
raise StorageError("STREAM3R_STORAGE_BUCKET is required for S3 storage")
def build_uri(self, key: str) -> str:
bucket = self.settings.s3_bucket
return f"s3://{bucket}/{key}"
def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str:
extra_args = {"ContentType": content_type} if content_type else None
try:
self._client.upload_file(str(local_path), self.settings.s3_bucket, key, ExtraArgs=extra_args)
except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects
raise StorageError(f"Failed to upload {local_path} to {key}: {exc}") from exc
return self.build_uri(key)
def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str:
extra_args = {"ContentType": content_type} if content_type else None
try:
self._client.put_object(
Bucket=self.settings.s3_bucket,
Key=key,
Body=data,
ContentType=extra_args.get("ContentType") if extra_args else None,
)
except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects
raise StorageError(f"Failed to upload payload to {key}: {exc}") from exc
return self.build_uri(key)
def download_to_path(self, key: str, destination: Path) -> Path:
destination.parent.mkdir(parents=True, exist_ok=True)
try:
object_key = str(key).lstrip("/")
self._client.download_file(self.settings.s3_bucket, object_key, str(destination))
except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects
raise StorageError(
f"Failed to download {object_key} from bucket {self.settings.s3_bucket} to {destination}: {exc}"
) from exc
return destination
class LocalStorageClient(StorageClient):
"""On-disk storage backend for development and testing."""
def __init__(self, settings: WorkerSettings):
super().__init__(settings)
self.root = settings.local_storage_root
self.root.mkdir(parents=True, exist_ok=True)
def _resolve(self, key: str) -> Path:
path = self.root.joinpath(*key.split("/"))
path.parent.mkdir(parents=True, exist_ok=True)
return path
def build_uri(self, key: str) -> str:
return str(self._resolve(key))
def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002
destination = self._resolve(key)
shutil.copyfile(local_path, destination)
return str(destination)
def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002
destination = self._resolve(key)
destination.write_bytes(data)
return str(destination)
def download_to_path(self, key: str, destination: Path) -> Path:
source = self._resolve(key)
if not source.exists():
raise StorageError(f"Local object not found for key: {key}")
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(source, destination)
return destination
def create_storage_client(settings: WorkerSettings) -> StorageClient:
"""Instantiate the appropriate storage backend."""
if settings.s3_bucket:
return S3StorageClient(settings)
return LocalStorageClient(settings)