""" Backup & Restore API — talks directly to HuggingFace Dataset API. Flow: 1. Space asks Worker for HF credentials (token, repo, user path prefix) 2. Space uploads/downloads/lists archives directly via HuggingFace API Requires env var: ADMIN_API_URL - URL of the Cloudflare Worker admin API """ import os import shutil import tarfile from datetime import datetime from pathlib import Path import httpx from fastapi import APIRouter, HTTPException, BackgroundTasks, Request from config import DATA_DIR, ADMIN_API_URL, BACKUP_DIR from storage import load_meta, save_meta, validate_zone_name router = APIRouter(prefix="/api/backup", tags=["backup"]) HF_API = "https://huggingface.co/api" def _get_token(request: Request) -> str: """Extract JWT token from request Authorization header.""" auth = request.headers.get("Authorization", "") if auth.startswith("Bearer "): return auth[7:] return "" def _get_credentials(token: str) -> dict: """Get HF credentials from Worker. Returns {hf_token, repo, path_prefix}.""" with httpx.Client(timeout=15) as client: resp = client.get( f"{ADMIN_API_URL}/backup/credentials", headers={"Authorization": f"Bearer {token}"}, ) if resp.status_code != 200: data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {"error": resp.text} raise ValueError(data.get("error", f"Worker error: {resp.status_code}")) return resp.json() def _log_action(token: str, zone_name: str, action: str, status: str, file_path: str = ""): """Log backup/restore action to Worker (best-effort).""" try: with httpx.Client(timeout=10) as client: client.post( f"{ADMIN_API_URL}/backup/log", headers={"Authorization": f"Bearer {token}"}, json={"zone_name": zone_name, "action": action, "status": status, "file_path": file_path}, ) except Exception: pass def _create_zone_archive(zone_name: str) -> Path: """Create a tar.gz archive of a zone directory.""" zone_path = DATA_DIR / zone_name if not zone_path.is_dir(): raise ValueError(f"Zone '{zone_name}' khong ton tai") archive_path = BACKUP_DIR / f"{zone_name}.tar.gz" with tarfile.open(archive_path, "w:gz") as tar: tar.add(str(zone_path), arcname=zone_name) return archive_path _backup_status: dict = {"running": False, "last": None, "error": None, "progress": ""} @router.get("/status") def backup_status(): return { "configured": bool(ADMIN_API_URL), "admin_url": ADMIN_API_URL or None, "running": _backup_status["running"], "last": _backup_status["last"], "error": _backup_status["error"], "progress": _backup_status["progress"], } @router.get("/list") async def list_backups(request: Request): if not ADMIN_API_URL: raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") token = _get_token(request) if not token: raise HTTPException(401, "Chua dang nhap") try: creds = _get_credentials(token) async with httpx.AsyncClient(timeout=30) as client: resp = await client.get( f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}", headers={"Authorization": f"Bearer {creds['hf_token']}"}, ) if resp.status_code == 404: return [] if resp.status_code != 200: raise HTTPException(502, f"HF API error: {resp.status_code} {resp.text}") tree = resp.json() meta = load_meta() return [ { "zone_name": f["path"].split("/")[-1].replace(".tar.gz", ""), "file": f["path"], "size": (f.get("lfs") or {}).get("size") or f.get("size", 0), "last_modified": (f.get("lastCommit") or {}).get("date", ""), "local_exists": f["path"].split("/")[-1].replace(".tar.gz", "") in meta, } for f in tree if f.get("type") == "file" and f["path"].endswith(".tar.gz") ] except ValueError as e: raise HTTPException(502, str(e)) except httpx.HTTPError as e: raise HTTPException(502, f"Khong the ket noi: {e}") def _upload_to_hf(creds: dict, zone_name: str, archive_path: Path): """Upload archive directly to HuggingFace Dataset via huggingface_hub.""" from huggingface_hub import HfApi file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz" api = HfApi(token=creds["hf_token"]) api.upload_file( path_or_fileobj=str(archive_path), path_in_repo=file_path, repo_id=creds["repo"], repo_type="dataset", commit_message=f"Backup zone: {zone_name}", ) @router.post("/zone/{zone_name}") async def backup_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks): if not ADMIN_API_URL: raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") token = _get_token(request) if not token: raise HTTPException(401, "Chua dang nhap") try: validate_zone_name(zone_name) if not (DATA_DIR / zone_name).is_dir(): raise ValueError(f"Zone '{zone_name}' khong ton tai") except ValueError as e: raise HTTPException(400, str(e)) if _backup_status["running"]: raise HTTPException(409, "Dang co backup khac dang chay") # Fetch credentials before background task (validates JWT now) try: creds = _get_credentials(token) except ValueError as e: raise HTTPException(502, str(e)) def _run(): _backup_status["running"] = True _backup_status["error"] = None _backup_status["progress"] = f"Dang backup zone: {zone_name}..." try: archive_path = _create_zone_archive(zone_name) try: _upload_to_hf(creds, zone_name, archive_path) finally: archive_path.unlink(missing_ok=True) _log_action(token, zone_name, "backup", "success", f"{creds['path_prefix']}/{zone_name}.tar.gz") _backup_status["last"] = datetime.now().isoformat() _backup_status["progress"] = f"Backup zone {zone_name} thanh cong" except Exception as e: _backup_status["error"] = str(e) _backup_status["progress"] = f"Loi backup: {e}" _log_action(token, zone_name, "backup", "error") finally: _backup_status["running"] = False background_tasks.add_task(_run) return {"ok": True, "message": f"Dang backup zone {zone_name} trong nen..."} @router.post("/all") async def backup_all(request: Request, background_tasks: BackgroundTasks): if not ADMIN_API_URL: raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") token = _get_token(request) if not token: raise HTTPException(401, "Chua dang nhap") if _backup_status["running"]: raise HTTPException(409, "Dang co backup khac dang chay") try: creds = _get_credentials(token) except ValueError as e: raise HTTPException(502, str(e)) def _run(): _backup_status["running"] = True _backup_status["error"] = None _backup_status["progress"] = "Dang backup tat ca zones..." try: meta = load_meta() total = len(meta) done = 0 for zone_name in meta: zone_path = DATA_DIR / zone_name if not zone_path.is_dir(): continue _backup_status["progress"] = f"Dang backup zone {zone_name} ({done + 1}/{total})..." archive_path = _create_zone_archive(zone_name) try: _upload_to_hf(creds, zone_name, archive_path) finally: archive_path.unlink(missing_ok=True) _log_action(token, zone_name, "backup", "success", f"{creds['path_prefix']}/{zone_name}.tar.gz") done += 1 _backup_status["last"] = datetime.now().isoformat() _backup_status["progress"] = "Backup tat ca zones thanh cong" except Exception as e: _backup_status["error"] = str(e) _backup_status["progress"] = f"Loi backup: {e}" finally: _backup_status["running"] = False background_tasks.add_task(_run) return {"ok": True, "message": "Dang backup tat ca zones trong nen..."} def _download_from_hf(creds: dict, zone_name: str) -> bytes: """Download archive directly from HuggingFace Dataset.""" file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz" with httpx.Client(timeout=300, follow_redirects=True) as client: resp = client.get( f"https://huggingface.co/datasets/{creds['repo']}/resolve/main/{file_path}", headers={"Authorization": f"Bearer {creds['hf_token']}"}, ) if resp.status_code == 404: raise FileNotFoundError(f"Backup zone '{zone_name}' khong ton tai") if resp.status_code != 200: raise ValueError(f"HF download error: {resp.status_code}") return resp.content @router.post("/restore/{zone_name}") async def restore_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks): if not ADMIN_API_URL: raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") token = _get_token(request) if not token: raise HTTPException(401, "Chua dang nhap") try: validate_zone_name(zone_name) except ValueError as e: raise HTTPException(400, str(e)) if _backup_status["running"]: raise HTTPException(409, "Dang co backup/restore khac dang chay") try: creds = _get_credentials(token) except ValueError as e: raise HTTPException(502, str(e)) def _run(): _backup_status["running"] = True _backup_status["error"] = None _backup_status["progress"] = f"Dang restore zone: {zone_name}..." try: data = _download_from_hf(creds, zone_name) archive_path = BACKUP_DIR / f"{zone_name}.tar.gz" archive_path.write_bytes(data) try: zone_path = DATA_DIR / zone_name if zone_path.exists(): shutil.rmtree(zone_path) zone_path.mkdir(parents=True, exist_ok=True) with tarfile.open(archive_path, "r:gz") as tar: for member in tar.getmembers(): member_path = os.path.normpath(member.name) if member_path.startswith("..") or os.path.isabs(member_path): raise ValueError(f"Archive chua path khong an toan: {member.name}") if not member_path.startswith(zone_name): raise ValueError(f"Archive chua path ngoai zone: {member.name}") tar.extractall(path=str(DATA_DIR), filter="data") meta = load_meta() if zone_name not in meta: meta[zone_name] = {"description": f"Restored from backup", "created": datetime.now().isoformat()} save_meta(meta) finally: archive_path.unlink(missing_ok=True) _log_action(token, zone_name, "restore", "success", f"{creds['path_prefix']}/{zone_name}.tar.gz") _backup_status["last"] = datetime.now().isoformat() _backup_status["progress"] = f"Restore zone {zone_name} thanh cong" except Exception as e: _backup_status["error"] = str(e) _backup_status["progress"] = f"Loi restore: {e}" _log_action(token, zone_name, "restore", "error") finally: _backup_status["running"] = False background_tasks.add_task(_run) return {"ok": True, "message": f"Dang restore zone {zone_name} trong nen..."} @router.post("/restore-all") async def restore_all(request: Request, background_tasks: BackgroundTasks): if not ADMIN_API_URL: raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") token = _get_token(request) if not token: raise HTTPException(401, "Chua dang nhap") if _backup_status["running"]: raise HTTPException(409, "Dang co backup/restore khac dang chay") try: creds = _get_credentials(token) except ValueError as e: raise HTTPException(502, str(e)) def _run(): _backup_status["running"] = True _backup_status["error"] = None _backup_status["progress"] = "Dang restore tat ca zones..." try: # List backups from HF directly with httpx.Client(timeout=30) as client: resp = client.get( f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}", headers={"Authorization": f"Bearer {creds['hf_token']}"}, ) if resp.status_code == 404: _backup_status["progress"] = "Khong co backup nao" return if resp.status_code != 200: raise ValueError(f"HF API error: {resp.status_code}") tree = resp.json() backup_files = [ f for f in tree if f.get("type") == "file" and f["path"].endswith(".tar.gz") ] total = len(backup_files) done = 0 for bf in backup_files: zn = bf["path"].split("/")[-1].replace(".tar.gz", "") _backup_status["progress"] = f"Dang restore zone {zn} ({done + 1}/{total})..." try: data = _download_from_hf(creds, zn) except FileNotFoundError: continue archive_path = BACKUP_DIR / f"{zn}.tar.gz" archive_path.write_bytes(data) try: zone_path = DATA_DIR / zn if zone_path.exists(): shutil.rmtree(zone_path) zone_path = DATA_DIR / zn if zone_path.exists(): shutil.rmtree(zone_path) zone_path.mkdir(parents=True, exist_ok=True) with tarfile.open(archive_path, "r:gz") as tar: for member in tar.getmembers(): member_path = os.path.normpath(member.name) if member_path.startswith("..") or os.path.isabs(member_path): raise ValueError(f"Archive chua path khong an toan: {member.name}") if not member_path.startswith(zn): raise ValueError(f"Archive chua path ngoai zone: {member.name}") tar.extractall(path=str(DATA_DIR), filter="data") meta = load_meta() if zn not in meta: meta[zn] = {"description": "Restored from backup", "created": datetime.now().isoformat()} save_meta(meta) finally: archive_path.unlink(missing_ok=True) done += 1 _backup_status["last"] = datetime.now().isoformat() _backup_status["progress"] = f"Restore {done}/{total} zones thanh cong" except Exception as e: _backup_status["error"] = str(e) _backup_status["progress"] = f"Loi restore: {e}" finally: _backup_status["running"] = False background_tasks.add_task(_run) return {"ok": True, "message": "Dang restore tat ca zones trong nen..."}