kalshifier / app /template_manager.py
dhruv575
Initial build
818d48b
import logging
import io
import shutil
import tempfile
import time
import zipfile
from pathlib import Path
from typing import Dict, List
import requests
from fastapi import HTTPException
from .config import Settings
from .storage_client import TemplateStorageClient
logger = logging.getLogger("template-manager")
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(levelname)s] %(name)s: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.propagate = False
class TemplateManager:
"""Manage HTML template bundles uploaded from the frontend."""
REQUIRED_PATHS = {"index.html"}
REQUIRED_FOLDERS = set() # No longer needed - template is self-contained
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.active_dir = Path(settings.template_root).resolve()
self.archive_dir = Path(settings.template_archive_dir).resolve()
self.header_source = Path(settings.header_path).resolve()
self.storage_client = TemplateStorageClient(
settings.supabase_url,
settings.supabase_service_key,
settings.storage_prefix or "",
)
self._ensure_directories()
def _ensure_directories(self) -> None:
self.active_dir.mkdir(parents=True, exist_ok=True)
self.archive_dir.mkdir(parents=True, exist_ok=True)
had_content = any(self.active_dir.iterdir())
if not had_content:
logger.info("Active template is empty, hydrating from cloud storage")
self._hydrate_from_cloud()
def get_active_template_path(self) -> Path:
if not self.active_dir.exists():
raise HTTPException(status_code=404, detail="No active template available")
return self.active_dir
def validate_zip(self, raw_bytes: bytes) -> Dict[str, object]:
errors: List[str] = []
warnings: List[str] = []
try:
logger.info("Validating uploaded template (size=%s bytes)", len(raw_bytes))
with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle:
names = [name for name in bundle.namelist() if not name.endswith("/")]
normalized = {self._normalize_path(name) for name in names}
for required in self.REQUIRED_PATHS:
if not any(path.endswith(required) for path in normalized):
errors.append(f"Missing required file: {required}")
# No longer require CSS/fonts/images folders - template is self-contained
if not any(name.lower().endswith("index.html") for name in names):
errors.append("index.html not found in archive")
except zipfile.BadZipFile as exc:
raise HTTPException(status_code=400, detail=f"Invalid zip file: {exc}") from exc
logger.info("Template validation completed: %s errors, %s warnings", len(errors), len(warnings))
return {"valid": not errors, "errors": errors, "warnings": warnings}
def install_template(self, raw_bytes: bytes) -> Dict[str, str]:
validation = self.validate_zip(raw_bytes)
if not validation["valid"]:
raise HTTPException(status_code=400, detail={"errors": validation["errors"]})
logger.info("Installing new template bundle")
with tempfile.TemporaryDirectory() as tmp_dir:
with zipfile.ZipFile(io.BytesIO(raw_bytes)) as bundle:
bundle.extractall(tmp_dir)
extracted_root = self._detect_root(Path(tmp_dir))
self._backup_active_template()
if self.active_dir.exists():
shutil.rmtree(self.active_dir)
shutil.copytree(extracted_root, self.active_dir)
logger.info("Template installation complete")
return {"status": "installed", "path": str(self.active_dir)}
def list_archives(self) -> List[str]:
return [str(p) for p in sorted(self.archive_dir.glob("*.zip"), reverse=True)]
def update_header_image(self, file_bytes: bytes) -> Dict[str, str]:
target = self.active_dir / "img" / "header.png"
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(file_bytes)
self.header_source.parent.mkdir(parents=True, exist_ok=True)
self.header_source.write_bytes(file_bytes)
logger.info("Updated header image at %s", target)
return {"status": "header-updated", "path": str(target)}
def _backup_active_template(self) -> None:
if not self.active_dir.exists() or not any(self.active_dir.iterdir()):
return
timestamp = int(time.time())
base_name = self.archive_dir / f"previous_template_{timestamp}"
shutil.make_archive(str(base_name), "zip", root_dir=self.active_dir)
logger.info("Backed up active template to %s.zip", base_name)
def _inject_header_image(self) -> None:
target = self.active_dir / "img" / "header.png"
target.parent.mkdir(parents=True, exist_ok=True)
if self.header_source.exists():
shutil.copy2(self.header_source, target)
else:
target.unlink(missing_ok=True)
def _hydrate_from_cloud(self) -> None:
if not self.storage_client.enabled:
raise HTTPException(status_code=500, detail="Cloud storage not configured for templates")
try:
logger.info("Downloading index.html from bucket %s", self.storage_client.bucket)
# Only download index.html - no CSS/fonts/images needed
self.storage_client.download_file("index.html", self.active_dir / "index.html")
except Exception as exc:
logger.exception("Failed to hydrate template from cloud")
raise HTTPException(status_code=500, detail=f"Failed to download template: {exc}") from exc
def describe_assets(self) -> Dict[str, List[str]]:
fonts = self._list_relative("fonts")
images = self._list_relative("img")
css_files = self._list_relative("css")
others = self._list_relative("", exclude_dirs={"fonts", "img", "css"})
logger.info(
"Assets snapshot -> fonts: %s, images: %s, css: %s, others: %s",
len(fonts),
len(images),
len(css_files),
len(others),
)
return {"fonts": fonts, "images": images, "css": css_files, "others": others}
def get_index_html(self) -> str:
index_path = self.active_dir / "index.html"
if not index_path.exists():
logger.warning("index.html missing at %s", index_path)
return ""
logger.info("Loaded index.html (%s bytes)", index_path.stat().st_size)
return index_path.read_text(encoding="utf-8")
def save_font(self, filename: str, data: bytes) -> Dict[str, str]:
"""Save font file locally (no longer uploaded to Supabase)."""
target = self.active_dir / "fonts" / filename
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
logger.info("Saved font %s locally (not uploaded to Supabase)", filename)
return {"status": "saved", "path": str(target.relative_to(self.active_dir))}
def save_image(self, filename: str, data: bytes) -> Dict[str, str]:
"""Save image file locally (no longer uploaded to Supabase)."""
target = self.active_dir / "img" / filename
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
logger.info("Saved image %s locally (not uploaded to Supabase)", filename)
return {"status": "saved", "path": str(target.relative_to(self.active_dir))}
def save_css(self, filename: str, data: bytes) -> Dict[str, str]:
"""Save CSS file locally (no longer uploaded to Supabase)."""
target = self.active_dir / "css" / filename
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
logger.info("Saved CSS %s locally (not uploaded to Supabase)", filename)
return {"status": "saved", "path": str(target.relative_to(self.active_dir))}
def save_index_html(self, data: bytes | str) -> Dict[str, str]:
target = self.active_dir / "index.html"
if isinstance(data, str):
data = data.encode("utf-8")
target.write_bytes(data)
if self.storage_client.enabled:
try:
self.storage_client.upload_object("index.html", data, content_type="text/html")
except Exception as exc:
logger.error("Failed to push index.html to Supabase: %s", exc)
return {"status": "saved", "path": str(target)}
def _list_relative(self, subdir: str, exclude_dirs: set | None = None) -> List[str]:
base = self.active_dir / subdir if subdir else self.active_dir
if not base.exists():
logger.warning("Requested subdir %s does not exist", subdir)
return []
results = []
for path in base.rglob("*"):
if path.is_file():
rel = path.relative_to(self.active_dir)
parts = rel.parts
if exclude_dirs and parts and parts[0] in exclude_dirs:
continue
results.append(str(rel))
return results
@staticmethod
def _detect_root(temp_dir: Path) -> Path:
children = [child for child in temp_dir.iterdir() if child.name != "__MACOSX"]
if len(children) == 1 and children[0].is_dir():
return children[0]
return temp_dir
@staticmethod
def _normalize_path(name: str) -> str:
return name.strip("/")