Spaces:
Sleeping
Sleeping
| import logging | |
| from pathlib import Path | |
| from typing import List, Optional | |
| import requests | |
| class TemplateStorageClient: | |
| """Supabase Storage helper (root only, recursively walks folders).""" | |
| def __init__(self, url: Optional[str], key: Optional[str], prefix: str = "") -> None: | |
| self.url = url | |
| self.key = key | |
| self.bucket = "Polygrapher" | |
| self.logger = logging.getLogger("template-storage") | |
| def enabled(self) -> bool: | |
| return bool(self.url and self.key) | |
| def download_template(self, destination: Path) -> None: | |
| """Download all template files from Supabase (legacy method - use download_file for single files).""" | |
| if not self.enabled: | |
| raise RuntimeError("Supabase storage not configured") | |
| objects = self._list_all_objects() | |
| if not objects: | |
| raise RuntimeError("No objects found in Supabase bucket") | |
| if destination.exists(): | |
| for child in destination.iterdir(): | |
| if child.is_file(): | |
| child.unlink() | |
| else: | |
| self._rm_dir(child) | |
| else: | |
| destination.mkdir(parents=True, exist_ok=True) | |
| for rel_path in objects: | |
| data = self._download_object(rel_path) | |
| target = destination / rel_path | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(data) | |
| self.logger.info("Downloaded %s", rel_path) | |
| def download_file(self, filename: str, destination: Path) -> None: | |
| """Download a single file from Supabase Storage.""" | |
| if not self.enabled: | |
| raise RuntimeError("Supabase storage not configured") | |
| data = self._download_object(filename) | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| destination.write_bytes(data) | |
| self.logger.info("Downloaded %s to %s", filename, destination) | |
| def _list_all_objects(self) -> List[str]: | |
| if not self.enabled: | |
| return [] | |
| endpoint = f"{self.url}/storage/v1/object/list/{self.bucket}" | |
| files: List[str] = [] | |
| stack = [""] | |
| seen_prefixes = set() | |
| while stack: | |
| prefix = stack.pop() | |
| if prefix in seen_prefixes: | |
| continue | |
| seen_prefixes.add(prefix) | |
| response = requests.post( | |
| endpoint, | |
| json={"prefix": prefix, "limit": 1000}, | |
| headers={"apikey": self.key, "Authorization": f"Bearer {self.key}"}, | |
| timeout=20, | |
| ) | |
| response.raise_for_status() | |
| payload = response.json() | |
| if isinstance(payload, list): | |
| payload = {"items": payload} | |
| for entry in payload.get("items", []): | |
| name = entry.get("name") | |
| if not name: | |
| continue | |
| normalized = (f"{prefix}{name}").lstrip("/") | |
| if self._is_file(entry): | |
| files.append(normalized.rstrip("/")) | |
| else: | |
| child_prefix = normalized.rstrip("/") + "/" | |
| stack.append(child_prefix) | |
| for folder in payload.get("folders", []): | |
| child_prefix = (f"{prefix}{folder}").lstrip("/").rstrip("/") + "/" | |
| stack.append(child_prefix) | |
| self.logger.info("Supabase returned %s files total", len(files)) | |
| return files | |
| def upload_object(self, name: str, data: bytes, content_type: str = "text/html") -> None: | |
| """Upload/overwrite a single object in Supabase Storage.""" | |
| if not self.enabled: | |
| self.logger.warning("Supabase upload skipped (storage disabled) for %s", name) | |
| return | |
| url = f"{self.url}/storage/v1/object/{self.bucket}/{name}" | |
| headers = { | |
| "apikey": self.key, | |
| "Authorization": f"Bearer {self.key}", | |
| "x-upsert": "true", | |
| "Content-Type": content_type, | |
| } | |
| response = requests.post(url, headers=headers, data=data, timeout=20) | |
| try: | |
| response.raise_for_status() | |
| self.logger.info("Uploaded %s to Supabase bucket %s", name, self.bucket) | |
| except Exception: | |
| self.logger.error( | |
| "Failed to upload %s to Supabase (status %s, body=%s)", | |
| name, | |
| response.status_code, | |
| response.text[:200], | |
| ) | |
| raise | |
| def _is_file(entry: dict) -> bool: | |
| if entry.get("id"): | |
| return True | |
| metadata = entry.get("metadata") | |
| if isinstance(metadata, dict) and any(key in metadata for key in ("size", "mimetype", "cacheControl")): | |
| return True | |
| name = entry.get("name") or "" | |
| return "." in name and not name.endswith("/") | |
| def _download_object(self, name: str) -> bytes: | |
| url = f"{self.url}/storage/v1/object/{self.bucket}/{name}" | |
| response = requests.get(url, headers={"apikey": self.key, "Authorization": f"Bearer {self.key}"}, timeout=20) | |
| response.raise_for_status() | |
| return response.content | |
| def _rm_dir(self, path: Path) -> None: | |
| for child in path.iterdir(): | |
| if child.is_file(): | |
| child.unlink() | |
| else: | |
| self._rm_dir(child) | |
| path.rmdir() | |