polygrapher / app /storage_client.py
dhruv575
Easier storage
0e309b8
import logging
from pathlib import Path
from typing import List, Optional
import requests
class TemplateStorageClient:
"""Supabase Storage helper (root only, recursively walks folders)."""
def __init__(self, url: Optional[str], key: Optional[str], prefix: str = "") -> None:
self.url = url
self.key = key
self.bucket = "Polygrapher"
self.logger = logging.getLogger("template-storage")
@property
def enabled(self) -> bool:
return bool(self.url and self.key)
def download_template(self, destination: Path) -> None:
"""Download all template files from Supabase (legacy method - use download_file for single files)."""
if not self.enabled:
raise RuntimeError("Supabase storage not configured")
objects = self._list_all_objects()
if not objects:
raise RuntimeError("No objects found in Supabase bucket")
if destination.exists():
for child in destination.iterdir():
if child.is_file():
child.unlink()
else:
self._rm_dir(child)
else:
destination.mkdir(parents=True, exist_ok=True)
for rel_path in objects:
data = self._download_object(rel_path)
target = destination / rel_path
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
self.logger.info("Downloaded %s", rel_path)
def download_file(self, filename: str, destination: Path) -> None:
"""Download a single file from Supabase Storage."""
if not self.enabled:
raise RuntimeError("Supabase storage not configured")
data = self._download_object(filename)
destination.parent.mkdir(parents=True, exist_ok=True)
destination.write_bytes(data)
self.logger.info("Downloaded %s to %s", filename, destination)
def _list_all_objects(self) -> List[str]:
if not self.enabled:
return []
endpoint = f"{self.url}/storage/v1/object/list/{self.bucket}"
files: List[str] = []
stack = [""]
seen_prefixes = set()
while stack:
prefix = stack.pop()
if prefix in seen_prefixes:
continue
seen_prefixes.add(prefix)
response = requests.post(
endpoint,
json={"prefix": prefix, "limit": 1000},
headers={"apikey": self.key, "Authorization": f"Bearer {self.key}"},
timeout=20,
)
response.raise_for_status()
payload = response.json()
if isinstance(payload, list):
payload = {"items": payload}
for entry in payload.get("items", []):
name = entry.get("name")
if not name:
continue
normalized = (f"{prefix}{name}").lstrip("/")
if self._is_file(entry):
files.append(normalized.rstrip("/"))
else:
child_prefix = normalized.rstrip("/") + "/"
stack.append(child_prefix)
for folder in payload.get("folders", []):
child_prefix = (f"{prefix}{folder}").lstrip("/").rstrip("/") + "/"
stack.append(child_prefix)
self.logger.info("Supabase returned %s files total", len(files))
return files
def upload_object(self, name: str, data: bytes, content_type: str = "text/html") -> None:
"""Upload/overwrite a single object in Supabase Storage."""
if not self.enabled:
self.logger.warning("Supabase upload skipped (storage disabled) for %s", name)
return
url = f"{self.url}/storage/v1/object/{self.bucket}/{name}"
headers = {
"apikey": self.key,
"Authorization": f"Bearer {self.key}",
"x-upsert": "true",
"Content-Type": content_type,
}
response = requests.post(url, headers=headers, data=data, timeout=20)
try:
response.raise_for_status()
self.logger.info("Uploaded %s to Supabase bucket %s", name, self.bucket)
except Exception:
self.logger.error(
"Failed to upload %s to Supabase (status %s, body=%s)",
name,
response.status_code,
response.text[:200],
)
raise
@staticmethod
def _is_file(entry: dict) -> bool:
if entry.get("id"):
return True
metadata = entry.get("metadata")
if isinstance(metadata, dict) and any(key in metadata for key in ("size", "mimetype", "cacheControl")):
return True
name = entry.get("name") or ""
return "." in name and not name.endswith("/")
def _download_object(self, name: str) -> bytes:
url = f"{self.url}/storage/v1/object/{self.bucket}/{name}"
response = requests.get(url, headers={"apikey": self.key, "Authorization": f"Bearer {self.key}"}, timeout=20)
response.raise_for_status()
return response.content
def _rm_dir(self, path: Path) -> None:
for child in path.iterdir():
if child.is_file():
child.unlink()
else:
self._rm_dir(child)
path.rmdir()