| |
| from __future__ import annotations |
|
|
| import io, tempfile, os |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Iterable, Optional |
|
|
| @dataclass |
| class BlobInfo: |
| key: str |
| size: Optional[int] = None |
| modified: Optional[str] = None |
| is_dir: bool = False |
|
|
| class BlobStore: |
| def list(self, prefix: str = "", recursive: bool = False) -> Iterable[BlobInfo]: |
| raise NotImplementedError |
|
|
| def read_bytes(self, key: str) -> bytes: |
| raise NotImplementedError |
|
|
| def write_bytes(self, key: str, data: bytes, content_type: Optional[str] = None) -> None: |
| raise NotImplementedError |
|
|
| def head(self, key: str) -> BlobInfo: |
| raise NotImplementedError |
|
|
| def exists(self, key: str) -> bool: |
| try: |
| self.head(key) |
| return True |
| except Exception: |
| return False |
|
|
| def download_to(self, key: str, dest_path: Path) -> Path: |
| data = self.read_bytes(key) |
| dest_path.parent.mkdir(parents=True, exist_ok=True) |
| dest_path.write_bytes(data) |
| return dest_path |
|
|
| |
| class LocalStore(BlobStore): |
| def __init__(self, root: str | Path | None = None): |
| """ |
| Always land on a writable directory. |
| Priority: |
| 1) explicit root (if provided) |
| 2) APP_DATA_DIR (if set) |
| 3) /tmp/label_assistant (always writable on Spaces/containers) |
| """ |
| |
| if root: |
| base = Path(root) |
| elif os.getenv("APP_DATA_DIR"): |
| base = Path(os.getenv("APP_DATA_DIR")) |
| else: |
| base = Path(tempfile.gettempdir()) / "label_assistant" |
|
|
| |
| try: |
| base.mkdir(parents=True, exist_ok=True) |
| except Exception: |
| base = Path(tempfile.gettempdir()) / "label_assistant" |
| base.mkdir(parents=True, exist_ok=True) |
|
|
| self.root = base |
| print(f"[LocalStore] using {self.root}", flush=True) |
|
|
| def _p(self, key: str) -> Path: |
| return self.root / key |
|
|
| def list(self, prefix: str = "", recursive: bool = False) -> Iterable[BlobInfo]: |
| base = self._p(prefix) if prefix else self.root |
| if base.is_file(): |
| yield BlobInfo(prefix, size=base.stat().st_size) |
| return |
| if not base.exists(): |
| return |
| if recursive: |
| for p in base.rglob("*"): |
| if p.is_file(): |
| rel = str(p.relative_to(self.root)) |
| yield BlobInfo(rel, size=p.stat().st_size) |
| else: |
| for p in base.glob("*"): |
| rel = str(p.relative_to(self.root)) |
| yield BlobInfo(rel, size=(p.stat().st_size if p.is_file() else None), is_dir=p.is_dir()) |
|
|
| def read_bytes(self, key: str) -> bytes: |
| return self._p(key).read_bytes() |
|
|
| def write_bytes(self, key: str, data: bytes, content_type: Optional[str] = None) -> None: |
| p = self._p(key) |
| p.parent.mkdir(parents=True, exist_ok=True) |
| p.write_bytes(data) |
|
|
| def head(self, key: str) -> BlobInfo: |
| p = self._p(key) |
| s = p.stat() |
| return BlobInfo(key, size=(s.st_size if p.is_file() else None), is_dir=p.is_dir()) |
|
|
| |
| def get_store_from_env(kind: Optional[str] = None) -> BlobStore: |
| """ |
| Chooses a backend from environment: |
| - kind='s3' or AWS_* present -> S3 |
| - kind in ('gdrive','drive') or GDRIVE_* present -> Google Drive |
| - default -> LocalStore at ./remote_cache |
| """ |
| kind = (kind or os.getenv("BLOB_BACKEND", "")).lower() |
|
|
| |
| if kind == "s3" or os.getenv("AWS_S3_BUCKET"): |
| from cloud.storage_s3 import S3Store |
| return S3Store( |
| bucket=os.getenv("AWS_S3_BUCKET"), |
| prefix=os.getenv("AWS_S3_PREFIX", ""), |
| region=os.getenv("AWS_REGION"), |
| endpoint_url=os.getenv("AWS_ENDPOINT_URL") or None, |
| ) |
|
|
| if kind in ("gdrive", "drive") or os.getenv("GDRIVE_FOLDER_ID"): |
| from cloud.storage_gdrive import GDriveStore |
| return GDriveStore( |
| folder_id=os.getenv("GDRIVE_FOLDER_ID"), |
| creds_json_path=os.getenv("GDRIVE_CREDENTIALS_JSON", ""), |
| service_account_json=os.getenv("GDRIVE_SERVICE_ACCOUNT_JSON", ""), |
| ) |
|
|
| |
| return LocalStore(Path(__file__).resolve().parent / "remote_cache") |
|
|