import logging import io import shutil import tempfile import time import zipfile from pathlib import Path from typing import Dict, List import requests from fastapi import HTTPException from .config import Settings from .storage_client import TemplateStorageClient logger = logging.getLogger("template-manager") if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("[%(levelname)s] %(name)s: %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.propagate = False class TemplateManager: """Manage HTML template bundles uploaded from the frontend.""" REQUIRED_PATHS = {"index.html"} REQUIRED_FOLDERS = set() # No longer needed - template is self-contained def __init__(self, settings: Settings) -> None: self.settings = settings self.active_dir = Path(settings.template_root).resolve() self.archive_dir = Path(settings.template_archive_dir).resolve() self.header_source = Path(settings.header_path).resolve() self.storage_client = TemplateStorageClient( settings.supabase_url, settings.supabase_service_key, settings.storage_prefix or "", ) self._ensure_directories() def _ensure_directories(self) -> None: self.active_dir.mkdir(parents=True, exist_ok=True) self.archive_dir.mkdir(parents=True, exist_ok=True) had_content = any(self.active_dir.iterdir()) if not had_content: logger.info("Active template is empty, hydrating from cloud storage") self._hydrate_from_cloud() def get_active_template_path(self) -> Path: if not self.active_dir.exists(): raise HTTPException(status_code=404, detail="No active template available") return self.active_dir def validate_zip(self, raw_bytes: bytes) -> Dict[str, object]: errors: List[str] = [] warnings: List[str] = [] try: logger.info("Validating uploaded template (size=%s bytes)", len(raw_bytes)) with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle: names = [name for name in bundle.namelist() if not name.endswith("/")] normalized = {self._normalize_path(name) for name in names} for required in self.REQUIRED_PATHS: if not any(path.endswith(required) for path in normalized): errors.append(f"Missing required file: {required}") # No longer require CSS/fonts/images folders - template is self-contained if not any(name.lower().endswith("index.html") for name in names): errors.append("index.html not found in archive") except zipfile.BadZipFile as exc: raise HTTPException(status_code=400, detail=f"Invalid zip file: {exc}") from exc logger.info("Template validation completed: %s errors, %s warnings", len(errors), len(warnings)) return {"valid": not errors, "errors": errors, "warnings": warnings} def install_template(self, raw_bytes: bytes) -> Dict[str, str]: validation = self.validate_zip(raw_bytes) if not validation["valid"]: raise HTTPException(status_code=400, detail={"errors": validation["errors"]}) logger.info("Installing new template bundle") with tempfile.TemporaryDirectory() as tmp_dir: with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle: bundle.extractall(tmp_dir) extracted_root = self._detect_root(Path(tmp_dir)) self._backup_active_template() if self.active_dir.exists(): shutil.rmtree(self.active_dir) shutil.copytree(extracted_root, self.active_dir) logger.info("Template installation complete") return {"status": "installed", "path": str(self.active_dir)} def list_archives(self) -> List[str]: return [str(p) for p in sorted(self.archive_dir.glob("*.zip"), reverse=True)] def update_header_image(self, file_bytes: bytes) -> Dict[str, str]: target = self.active_dir / "img" / "header.png" target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(file_bytes) self.header_source.parent.mkdir(parents=True, exist_ok=True) self.header_source.write_bytes(file_bytes) logger.info("Updated header image at %s", target) return {"status": "header-updated", "path": str(target)} def _backup_active_template(self) -> None: if not self.active_dir.exists() or not any(self.active_dir.iterdir()): return timestamp = int(time.time()) base_name = self.archive_dir / f"previous_template_{timestamp}" shutil.make_archive(str(base_name), "zip", root_dir=self.active_dir) logger.info("Backed up active template to %s.zip", base_name) def _inject_header_image(self) -> None: target = self.active_dir / "img" / "header.png" target.parent.mkdir(parents=True, exist_ok=True) if self.header_source.exists(): shutil.copy2(self.header_source, target) else: target.unlink(missing_ok=True) def _hydrate_from_cloud(self) -> None: if not self.storage_client.enabled: raise HTTPException(status_code=500, detail="Cloud storage not configured for templates") try: logger.info("Downloading index.html from bucket %s", self.storage_client.bucket) # Only download index.html - no CSS/fonts/images needed self.storage_client.download_file("index.html", self.active_dir / "index.html") except Exception as exc: logger.exception("Failed to hydrate template from cloud") raise HTTPException(status_code=500, detail=f"Failed to download template: {exc}") from exc def describe_assets(self) -> Dict[str, List[str]]: fonts = self._list_relative("fonts") images = self._list_relative("img") css_files = self._list_relative("css") others = self._list_relative("", exclude_dirs={"fonts", "img", "css"}) logger.info( "Assets snapshot -> fonts: %s, images: %s, css: %s, others: %s", len(fonts), len(images), len(css_files), len(others), ) return {"fonts": fonts, "images": images, "css": css_files, "others": others} def get_index_html(self) -> str: index_path = self.active_dir / "index.html" if not index_path.exists(): logger.warning("index.html missing at %s", index_path) return "" logger.info("Loaded index.html (%s bytes)", index_path.stat().st_size) return index_path.read_text(encoding="utf-8") def save_font(self, filename: str, data: bytes) -> Dict[str, str]: """Save font file locally (no longer uploaded to Supabase).""" target = self.active_dir / "fonts" / filename target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(data) logger.info("Saved font %s locally (not uploaded to Supabase)", filename) return {"status": "saved", "path": str(target.relative_to(self.active_dir))} def save_image(self, filename: str, data: bytes) -> Dict[str, str]: """Save image file locally (no longer uploaded to Supabase).""" target = self.active_dir / "img" / filename target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(data) logger.info("Saved image %s locally (not uploaded to Supabase)", filename) return {"status": "saved", "path": str(target.relative_to(self.active_dir))} def save_css(self, filename: str, data: bytes) -> Dict[str, str]: """Save CSS file locally (no longer uploaded to Supabase).""" target = self.active_dir / "css" / filename target.parent.mkdir(parents=True, exist_ok=True) target.write_bytes(data) logger.info("Saved CSS %s locally (not uploaded to Supabase)", filename) return {"status": "saved", "path": str(target.relative_to(self.active_dir))} def save_index_html(self, data: bytes | str) -> Dict[str, str]: target = self.active_dir / "index.html" if isinstance(data, str): data = data.encode("utf-8") target.write_bytes(data) if self.storage_client.enabled: try: self.storage_client.upload_object("index.html", data, content_type="text/html") except Exception as exc: logger.error("Failed to push index.html to Supabase: %s", exc) return {"status": "saved", "path": str(target)} def _list_relative(self, subdir: str, exclude_dirs: set | None = None) -> List[str]: base = self.active_dir / subdir if subdir else self.active_dir if not base.exists(): logger.warning("Requested subdir %s does not exist", subdir) return [] results = [] for path in base.rglob("*"): if path.is_file(): rel = path.relative_to(self.active_dir) parts = rel.parts if exclude_dirs and parts and parts[0] in exclude_dirs: continue results.append(str(rel)) return results @staticmethod def _detect_root(temp_dir: Path) -> Path: children = [child for child in temp_dir.iterdir() if child.name != "__MACOSX"] if len(children) == 1 and children[0].is_dir(): return children[0] return temp_dir @staticmethod def _normalize_path(name: str) -> str: return name.strip("/")