Spaces:
Sleeping
Sleeping
| import logging | |
| import io | |
| import shutil | |
| import tempfile | |
| import time | |
| import zipfile | |
| from pathlib import Path | |
| from typing import Dict, List | |
| import requests | |
| from fastapi import HTTPException | |
| from .config import Settings | |
| from .storage_client import TemplateStorageClient | |
| logger = logging.getLogger("template-manager") | |
| if not logger.handlers: | |
| handler = logging.StreamHandler() | |
| formatter = logging.Formatter("[%(levelname)s] %(name)s: %(message)s") | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| logger.setLevel(logging.INFO) | |
| logger.propagate = False | |
| class TemplateManager: | |
| """Manage HTML template bundles uploaded from the frontend.""" | |
| REQUIRED_PATHS = {"index.html"} | |
| REQUIRED_FOLDERS = set() # No longer needed - template is self-contained | |
| def __init__(self, settings: Settings) -> None: | |
| self.settings = settings | |
| self.active_dir = Path(settings.template_root).resolve() | |
| self.archive_dir = Path(settings.template_archive_dir).resolve() | |
| self.header_source = Path(settings.header_path).resolve() | |
| self.storage_client = TemplateStorageClient( | |
| settings.supabase_url, | |
| settings.supabase_service_key, | |
| settings.storage_prefix or "", | |
| ) | |
| self._ensure_directories() | |
| def _ensure_directories(self) -> None: | |
| self.active_dir.mkdir(parents=True, exist_ok=True) | |
| self.archive_dir.mkdir(parents=True, exist_ok=True) | |
| had_content = any(self.active_dir.iterdir()) | |
| if not had_content: | |
| logger.info("Active template is empty, hydrating from cloud storage") | |
| self._hydrate_from_cloud() | |
| def get_active_template_path(self) -> Path: | |
| if not self.active_dir.exists(): | |
| raise HTTPException(status_code=404, detail="No active template available") | |
| return self.active_dir | |
| def validate_zip(self, raw_bytes: bytes) -> Dict[str, object]: | |
| errors: List[str] = [] | |
| warnings: List[str] = [] | |
| try: | |
| logger.info("Validating uploaded template (size=%s bytes)", len(raw_bytes)) | |
| with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle: | |
| names = [name for name in bundle.namelist() if not name.endswith("/")] | |
| normalized = {self._normalize_path(name) for name in names} | |
| for required in self.REQUIRED_PATHS: | |
| if not any(path.endswith(required) for path in normalized): | |
| errors.append(f"Missing required file: {required}") | |
| # No longer require CSS/fonts/images folders - template is self-contained | |
| if not any(name.lower().endswith("index.html") for name in names): | |
| errors.append("index.html not found in archive") | |
| except zipfile.BadZipFile as exc: | |
| raise HTTPException(status_code=400, detail=f"Invalid zip file: {exc}") from exc | |
| logger.info("Template validation completed: %s errors, %s warnings", len(errors), len(warnings)) | |
| return {"valid": not errors, "errors": errors, "warnings": warnings} | |
| def install_template(self, raw_bytes: bytes) -> Dict[str, str]: | |
| validation = self.validate_zip(raw_bytes) | |
| if not validation["valid"]: | |
| raise HTTPException(status_code=400, detail={"errors": validation["errors"]}) | |
| logger.info("Installing new template bundle") | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle: | |
| bundle.extractall(tmp_dir) | |
| extracted_root = self._detect_root(Path(tmp_dir)) | |
| self._backup_active_template() | |
| if self.active_dir.exists(): | |
| shutil.rmtree(self.active_dir) | |
| shutil.copytree(extracted_root, self.active_dir) | |
| logger.info("Template installation complete") | |
| return {"status": "installed", "path": str(self.active_dir)} | |
| def list_archives(self) -> List[str]: | |
| return [str(p) for p in sorted(self.archive_dir.glob("*.zip"), reverse=True)] | |
| def update_header_image(self, file_bytes: bytes) -> Dict[str, str]: | |
| target = self.active_dir / "img" / "header.png" | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(file_bytes) | |
| self.header_source.parent.mkdir(parents=True, exist_ok=True) | |
| self.header_source.write_bytes(file_bytes) | |
| logger.info("Updated header image at %s", target) | |
| return {"status": "header-updated", "path": str(target)} | |
| def _backup_active_template(self) -> None: | |
| if not self.active_dir.exists() or not any(self.active_dir.iterdir()): | |
| return | |
| timestamp = int(time.time()) | |
| base_name = self.archive_dir / f"previous_template_{timestamp}" | |
| shutil.make_archive(str(base_name), "zip", root_dir=self.active_dir) | |
| logger.info("Backed up active template to %s.zip", base_name) | |
| def _inject_header_image(self) -> None: | |
| target = self.active_dir / "img" / "header.png" | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| if self.header_source.exists(): | |
| shutil.copy2(self.header_source, target) | |
| else: | |
| target.unlink(missing_ok=True) | |
| def _hydrate_from_cloud(self) -> None: | |
| if not self.storage_client.enabled: | |
| raise HTTPException(status_code=500, detail="Cloud storage not configured for templates") | |
| try: | |
| logger.info("Downloading index.html from bucket %s", self.storage_client.bucket) | |
| # Only download index.html - no CSS/fonts/images needed | |
| self.storage_client.download_file("index.html", self.active_dir / "index.html") | |
| except Exception as exc: | |
| logger.exception("Failed to hydrate template from cloud") | |
| raise HTTPException(status_code=500, detail=f"Failed to download template: {exc}") from exc | |
| def describe_assets(self) -> Dict[str, List[str]]: | |
| fonts = self._list_relative("fonts") | |
| images = self._list_relative("img") | |
| css_files = self._list_relative("css") | |
| others = self._list_relative("", exclude_dirs={"fonts", "img", "css"}) | |
| logger.info( | |
| "Assets snapshot -> fonts: %s, images: %s, css: %s, others: %s", | |
| len(fonts), | |
| len(images), | |
| len(css_files), | |
| len(others), | |
| ) | |
| return {"fonts": fonts, "images": images, "css": css_files, "others": others} | |
| def get_index_html(self) -> str: | |
| index_path = self.active_dir / "index.html" | |
| if not index_path.exists(): | |
| logger.warning("index.html missing at %s", index_path) | |
| return "" | |
| logger.info("Loaded index.html (%s bytes)", index_path.stat().st_size) | |
| return index_path.read_text(encoding="utf-8") | |
| def save_font(self, filename: str, data: bytes) -> Dict[str, str]: | |
| """Save font file locally (no longer uploaded to Supabase).""" | |
| target = self.active_dir / "fonts" / filename | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(data) | |
| logger.info("Saved font %s locally (not uploaded to Supabase)", filename) | |
| return {"status": "saved", "path": str(target.relative_to(self.active_dir))} | |
| def save_image(self, filename: str, data: bytes) -> Dict[str, str]: | |
| """Save image file locally (no longer uploaded to Supabase).""" | |
| target = self.active_dir / "img" / filename | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(data) | |
| logger.info("Saved image %s locally (not uploaded to Supabase)", filename) | |
| return {"status": "saved", "path": str(target.relative_to(self.active_dir))} | |
| def save_css(self, filename: str, data: bytes) -> Dict[str, str]: | |
| """Save CSS file locally (no longer uploaded to Supabase).""" | |
| target = self.active_dir / "css" / filename | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| target.write_bytes(data) | |
| logger.info("Saved CSS %s locally (not uploaded to Supabase)", filename) | |
| return {"status": "saved", "path": str(target.relative_to(self.active_dir))} | |
| def save_index_html(self, data: bytes | str) -> Dict[str, str]: | |
| target = self.active_dir / "index.html" | |
| if isinstance(data, str): | |
| data = data.encode("utf-8") | |
| target.write_bytes(data) | |
| if self.storage_client.enabled: | |
| try: | |
| self.storage_client.upload_object("index.html", data, content_type="text/html") | |
| except Exception as exc: | |
| logger.error("Failed to push index.html to Supabase: %s", exc) | |
| return {"status": "saved", "path": str(target)} | |
| def _list_relative(self, subdir: str, exclude_dirs: set | None = None) -> List[str]: | |
| base = self.active_dir / subdir if subdir else self.active_dir | |
| if not base.exists(): | |
| logger.warning("Requested subdir %s does not exist", subdir) | |
| return [] | |
| results = [] | |
| for path in base.rglob("*"): | |
| if path.is_file(): | |
| rel = path.relative_to(self.active_dir) | |
| parts = rel.parts | |
| if exclude_dirs and parts and parts[0] in exclude_dirs: | |
| continue | |
| results.append(str(rel)) | |
| return results | |
| def _detect_root(temp_dir: Path) -> Path: | |
| children = [child for child in temp_dir.iterdir() if child.name != "__MACOSX"] | |
| if len(children) == 1 and children[0].is_dir(): | |
| return children[0] | |
| return temp_dir | |
| def _normalize_path(name: str) -> str: | |
| return name.strip("/") | |