"""Storage backends for persisting STream3R job artifacts.""" from __future__ import annotations import json import shutil from pathlib import Path from typing import Mapping try: # Lazy import to keep optional dependency import boto3 from botocore.client import Config as Boto3Config from botocore.exceptions import BotoCoreError, ClientError except ModuleNotFoundError: # pragma: no cover - fallback when boto3 missing boto3 = None # type: ignore[assignment] Boto3Config = None # type: ignore[assignment] BotoCoreError = ClientError = Exception # type: ignore[assignment] from .config import WorkerSettings class StorageError(RuntimeError): """Raised when artifact persistence fails.""" class StorageClient: """Abstract base class providing a minimal upload interface.""" def __init__(self, settings: WorkerSettings): self.settings = settings # --- key builders ------------------------------------------------- def build_key(self, scene_id: str, *parts: str) -> str: components = [str(self.settings.storage_prefix), str(scene_id), "stream3r"] for part in parts: if not part: continue components.append(str(part).strip("/")) return "/".join(components) def build_uri(self, key: str) -> str: raise NotImplementedError # --- upload primitives ------------------------------------------- def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: raise NotImplementedError def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: raise NotImplementedError def upload_json(self, payload: Mapping[str, object], key: str) -> str: data = json.dumps(payload, allow_nan=False).encode("utf-8") return self.upload_bytes(data, key, content_type="application/json") def download_to_path(self, key: str, destination: Path) -> Path: """Download an object identified by *key* into *destination*.""" raise NotImplementedError # --- helpers ------------------------------------------------------ def ensure_dir(self, scene_id: str, *parts: str) -> str: """Create a logical directory path under the storage prefix.""" key = self.build_key(scene_id, *parts) if not key.endswith("/"): key = f"{key}/" return key class S3StorageClient(StorageClient): """S3-compatible storage backend.""" def __init__(self, settings: WorkerSettings): if boto3 is None: # pragma: no cover - guarded by optional dependency raise StorageError("boto3 is required for S3 storage but is not installed") super().__init__(settings) session_kwargs: dict[str, object] = {} if settings.s3_profile: session_kwargs["profile_name"] = settings.s3_profile if settings.s3_region: session_kwargs["region_name"] = settings.s3_region if settings.aws_access_key_id and settings.aws_secret_access_key: session_kwargs["aws_access_key_id"] = settings.aws_access_key_id session_kwargs["aws_secret_access_key"] = settings.aws_secret_access_key if settings.aws_session_token: session_kwargs["aws_session_token"] = settings.aws_session_token session = boto3.session.Session(**session_kwargs) config = None if settings.s3_force_path_style and Boto3Config is not None: config = Boto3Config(s3={"addressing_style": "path"}) self._client = session.client( "s3", endpoint_url=settings.s3_endpoint_url, config=config, ) if not settings.s3_bucket: raise StorageError("STREAM3R_STORAGE_BUCKET is required for S3 storage") def build_uri(self, key: str) -> str: bucket = self.settings.s3_bucket return f"s3://{bucket}/{key}" def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: extra_args = {"ContentType": content_type} if content_type else None try: self._client.upload_file(str(local_path), self.settings.s3_bucket, key, ExtraArgs=extra_args) except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects raise StorageError(f"Failed to upload {local_path} to {key}: {exc}") from exc return self.build_uri(key) def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: extra_args = {"ContentType": content_type} if content_type else None try: self._client.put_object( Bucket=self.settings.s3_bucket, Key=key, Body=data, ContentType=extra_args.get("ContentType") if extra_args else None, ) except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects raise StorageError(f"Failed to upload payload to {key}: {exc}") from exc return self.build_uri(key) def download_to_path(self, key: str, destination: Path) -> Path: destination.parent.mkdir(parents=True, exist_ok=True) try: object_key = str(key).lstrip("/") self._client.download_file(self.settings.s3_bucket, object_key, str(destination)) except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects raise StorageError( f"Failed to download {object_key} from bucket {self.settings.s3_bucket} to {destination}: {exc}" ) from exc return destination class LocalStorageClient(StorageClient): """On-disk storage backend for development and testing.""" def __init__(self, settings: WorkerSettings): super().__init__(settings) self.root = settings.local_storage_root self.root.mkdir(parents=True, exist_ok=True) def _resolve(self, key: str) -> Path: path = self.root.joinpath(*key.split("/")) path.parent.mkdir(parents=True, exist_ok=True) return path def build_uri(self, key: str) -> str: return str(self._resolve(key)) def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002 destination = self._resolve(key) shutil.copyfile(local_path, destination) return str(destination) def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002 destination = self._resolve(key) destination.write_bytes(data) return str(destination) def download_to_path(self, key: str, destination: Path) -> Path: source = self._resolve(key) if not source.exists(): raise StorageError(f"Local object not found for key: {key}") destination.parent.mkdir(parents=True, exist_ok=True) shutil.copyfile(source, destination) return destination def create_storage_client(settings: WorkerSettings) -> StorageClient: """Instantiate the appropriate storage backend.""" if settings.s3_bucket: return S3StorageClient(settings) return LocalStorageClient(settings)