alpha-engine / core /supabase_client.py
Dharambir Agrawal
HF Space server-only
fd48bc8
from __future__ import annotations
from typing import Any
from supabase import Client, create_client
from core.config import settings
class SupabaseStorageClient:
def __init__(self) -> None:
self.client: Client | None = None
if settings.SUPABASE_URL and settings.SUPABASE_SERVICE_KEY:
self.client = create_client(
settings.SUPABASE_URL,
settings.SUPABASE_SERVICE_KEY,
)
@property
def enabled(self) -> bool:
return self.client is not None
def upload_bytes(
self,
path: str,
content: bytes,
content_type: str = "application/octet-stream",
) -> bool:
if not self.client:
return False
try:
bucket = self.client.storage.from_(settings.SUPABASE_BUCKET)
bucket.upload(
path,
content,
file_options={"content-type": content_type, "upsert": "true"},
)
return True
except Exception:
return False
def download_bytes(self, path: str) -> bytes | None:
if not self.client:
return None
try:
bucket = self.client.storage.from_(settings.SUPABASE_BUCKET)
data: Any = bucket.download(path)
if isinstance(data, bytes):
return data
return None
except Exception:
return None
def remove(self, paths: list[str]) -> bool:
if not self.client or not paths:
return False
try:
bucket = self.client.storage.from_(settings.SUPABASE_BUCKET)
bucket.remove(paths)
return True
except Exception:
return False
supabase_storage = SupabaseStorageClient()