Spaces:
Configuration error
Configuration error
| """Storage backends for persisting STream3R job artifacts.""" | |
| from __future__ import annotations | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| from typing import Mapping | |
| try: # Lazy import to keep optional dependency | |
| import boto3 | |
| from botocore.client import Config as Boto3Config | |
| from botocore.exceptions import BotoCoreError, ClientError | |
| except ModuleNotFoundError: # pragma: no cover - fallback when boto3 missing | |
| boto3 = None # type: ignore[assignment] | |
| Boto3Config = None # type: ignore[assignment] | |
| BotoCoreError = ClientError = Exception # type: ignore[assignment] | |
| from .config import WorkerSettings | |
| class StorageError(RuntimeError): | |
| """Raised when artifact persistence fails.""" | |
| class StorageClient: | |
| """Abstract base class providing a minimal upload interface.""" | |
| def __init__(self, settings: WorkerSettings): | |
| self.settings = settings | |
| # --- key builders ------------------------------------------------- | |
| def build_key(self, scene_id: str, *parts: str) -> str: | |
| components = [str(self.settings.storage_prefix), str(scene_id), "stream3r"] | |
| for part in parts: | |
| if not part: | |
| continue | |
| components.append(str(part).strip("/")) | |
| return "/".join(components) | |
| def build_uri(self, key: str) -> str: | |
| raise NotImplementedError | |
| # --- upload primitives ------------------------------------------- | |
| def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: | |
| raise NotImplementedError | |
| def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: | |
| raise NotImplementedError | |
| def upload_json(self, payload: Mapping[str, object], key: str) -> str: | |
| data = json.dumps(payload, allow_nan=False).encode("utf-8") | |
| return self.upload_bytes(data, key, content_type="application/json") | |
| def download_to_path(self, key: str, destination: Path) -> Path: | |
| """Download an object identified by *key* into *destination*.""" | |
| raise NotImplementedError | |
| # --- helpers ------------------------------------------------------ | |
| def ensure_dir(self, scene_id: str, *parts: str) -> str: | |
| """Create a logical directory path under the storage prefix.""" | |
| key = self.build_key(scene_id, *parts) | |
| if not key.endswith("/"): | |
| key = f"{key}/" | |
| return key | |
| class S3StorageClient(StorageClient): | |
| """S3-compatible storage backend.""" | |
| def __init__(self, settings: WorkerSettings): | |
| if boto3 is None: # pragma: no cover - guarded by optional dependency | |
| raise StorageError("boto3 is required for S3 storage but is not installed") | |
| super().__init__(settings) | |
| session_kwargs: dict[str, object] = {} | |
| if settings.s3_profile: | |
| session_kwargs["profile_name"] = settings.s3_profile | |
| if settings.s3_region: | |
| session_kwargs["region_name"] = settings.s3_region | |
| if settings.aws_access_key_id and settings.aws_secret_access_key: | |
| session_kwargs["aws_access_key_id"] = settings.aws_access_key_id | |
| session_kwargs["aws_secret_access_key"] = settings.aws_secret_access_key | |
| if settings.aws_session_token: | |
| session_kwargs["aws_session_token"] = settings.aws_session_token | |
| session = boto3.session.Session(**session_kwargs) | |
| config = None | |
| if settings.s3_force_path_style and Boto3Config is not None: | |
| config = Boto3Config(s3={"addressing_style": "path"}) | |
| self._client = session.client( | |
| "s3", | |
| endpoint_url=settings.s3_endpoint_url, | |
| config=config, | |
| ) | |
| if not settings.s3_bucket: | |
| raise StorageError("STREAM3R_STORAGE_BUCKET is required for S3 storage") | |
| def build_uri(self, key: str) -> str: | |
| bucket = self.settings.s3_bucket | |
| return f"s3://{bucket}/{key}" | |
| def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: | |
| extra_args = {"ContentType": content_type} if content_type else None | |
| try: | |
| self._client.upload_file(str(local_path), self.settings.s3_bucket, key, ExtraArgs=extra_args) | |
| except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects | |
| raise StorageError(f"Failed to upload {local_path} to {key}: {exc}") from exc | |
| return self.build_uri(key) | |
| def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: | |
| extra_args = {"ContentType": content_type} if content_type else None | |
| try: | |
| self._client.put_object( | |
| Bucket=self.settings.s3_bucket, | |
| Key=key, | |
| Body=data, | |
| ContentType=extra_args.get("ContentType") if extra_args else None, | |
| ) | |
| except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects | |
| raise StorageError(f"Failed to upload payload to {key}: {exc}") from exc | |
| return self.build_uri(key) | |
| def download_to_path(self, key: str, destination: Path) -> Path: | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| object_key = str(key).lstrip("/") | |
| self._client.download_file(self.settings.s3_bucket, object_key, str(destination)) | |
| except (BotoCoreError, ClientError) as exc: # pragma: no cover - network side effects | |
| raise StorageError( | |
| f"Failed to download {object_key} from bucket {self.settings.s3_bucket} to {destination}: {exc}" | |
| ) from exc | |
| return destination | |
| class LocalStorageClient(StorageClient): | |
| """On-disk storage backend for development and testing.""" | |
| def __init__(self, settings: WorkerSettings): | |
| super().__init__(settings) | |
| self.root = settings.local_storage_root | |
| self.root.mkdir(parents=True, exist_ok=True) | |
| def _resolve(self, key: str) -> Path: | |
| path = self.root.joinpath(*key.split("/")) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| return path | |
| def build_uri(self, key: str) -> str: | |
| return str(self._resolve(key)) | |
| def upload_file(self, local_path: Path, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002 | |
| destination = self._resolve(key) | |
| shutil.copyfile(local_path, destination) | |
| return str(destination) | |
| def upload_bytes(self, data: bytes, key: str, *, content_type: str | None = None) -> str: # noqa: ARG002 | |
| destination = self._resolve(key) | |
| destination.write_bytes(data) | |
| return str(destination) | |
| def download_to_path(self, key: str, destination: Path) -> Path: | |
| source = self._resolve(key) | |
| if not source.exists(): | |
| raise StorageError(f"Local object not found for key: {key}") | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copyfile(source, destination) | |
| return destination | |
| def create_storage_client(settings: WorkerSettings) -> StorageClient: | |
| """Instantiate the appropriate storage backend.""" | |
| if settings.s3_bucket: | |
| return S3StorageClient(settings) | |
| return LocalStorageClient(settings) | |