Tan Zi Xu
gdrive integration
695209d
# storage.py
from __future__ import annotations
import io, tempfile, os
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
@dataclass
class BlobInfo:
key: str
size: Optional[int] = None
modified: Optional[str] = None
is_dir: bool = False
class BlobStore:
def list(self, prefix: str = "", recursive: bool = False) -> Iterable[BlobInfo]:
raise NotImplementedError
def read_bytes(self, key: str) -> bytes:
raise NotImplementedError
def write_bytes(self, key: str, data: bytes, content_type: Optional[str] = None) -> None:
raise NotImplementedError
def head(self, key: str) -> BlobInfo:
raise NotImplementedError
def exists(self, key: str) -> bool:
try:
self.head(key)
return True
except Exception:
return False
def download_to(self, key: str, dest_path: Path) -> Path:
data = self.read_bytes(key)
dest_path.parent.mkdir(parents=True, exist_ok=True)
dest_path.write_bytes(data)
return dest_path
# ---------- Local FS baseline ----------
class LocalStore(BlobStore):
def __init__(self, root: str | Path | None = None):
"""
Always land on a writable directory.
Priority:
1) explicit root (if provided)
2) APP_DATA_DIR (if set)
3) /tmp/label_assistant (always writable on Spaces/containers)
"""
# choose base
if root:
base = Path(root)
elif os.getenv("APP_DATA_DIR"):
base = Path(os.getenv("APP_DATA_DIR"))
else:
base = Path(tempfile.gettempdir()) / "label_assistant"
# ensure it exists; if it fails, force /tmp
try:
base.mkdir(parents=True, exist_ok=True)
except Exception:
base = Path(tempfile.gettempdir()) / "label_assistant"
base.mkdir(parents=True, exist_ok=True)
self.root = base
print(f"[LocalStore] using {self.root}", flush=True)
def _p(self, key: str) -> Path:
return self.root / key
def list(self, prefix: str = "", recursive: bool = False) -> Iterable[BlobInfo]:
base = self._p(prefix) if prefix else self.root
if base.is_file():
yield BlobInfo(prefix, size=base.stat().st_size)
return
if not base.exists():
return
if recursive:
for p in base.rglob("*"):
if p.is_file():
rel = str(p.relative_to(self.root))
yield BlobInfo(rel, size=p.stat().st_size)
else:
for p in base.glob("*"):
rel = str(p.relative_to(self.root))
yield BlobInfo(rel, size=(p.stat().st_size if p.is_file() else None), is_dir=p.is_dir())
def read_bytes(self, key: str) -> bytes:
return self._p(key).read_bytes()
def write_bytes(self, key: str, data: bytes, content_type: Optional[str] = None) -> None:
p = self._p(key)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(data)
def head(self, key: str) -> BlobInfo:
p = self._p(key)
s = p.stat()
return BlobInfo(key, size=(s.st_size if p.is_file() else None), is_dir=p.is_dir())
# ---------- Factory (lazy imports to avoid circulars) ----------
def get_store_from_env(kind: Optional[str] = None) -> BlobStore:
"""
Chooses a backend from environment:
- kind='s3' or AWS_* present -> S3
- kind in ('gdrive','drive') or GDRIVE_* present -> Google Drive
- default -> LocalStore at ./remote_cache
"""
kind = (kind or os.getenv("BLOB_BACKEND", "")).lower()
# Prefer explicit kind; otherwise detect via env presence
if kind == "s3" or os.getenv("AWS_S3_BUCKET"):
from cloud.storage_s3 import S3Store # lazy import
return S3Store(
bucket=os.getenv("AWS_S3_BUCKET"),
prefix=os.getenv("AWS_S3_PREFIX", ""),
region=os.getenv("AWS_REGION"),
endpoint_url=os.getenv("AWS_ENDPOINT_URL") or None,
)
if kind in ("gdrive", "drive") or os.getenv("GDRIVE_FOLDER_ID"):
from cloud.storage_gdrive import GDriveStore # lazy import
return GDriveStore(
folder_id=os.getenv("GDRIVE_FOLDER_ID"),
creds_json_path=os.getenv("GDRIVE_CREDENTIALS_JSON", ""),
service_account_json=os.getenv("GDRIVE_SERVICE_ACCOUNT_JSON", ""),
)
# default local cache directory
return LocalStore(Path(__file__).resolve().parent / "remote_cache")