| """ |
| Backup & Restore API — talks directly to HuggingFace Dataset API. |
| |
| Flow: |
| 1. Space asks Worker for HF credentials (token, repo, user path prefix) |
| 2. Space uploads/downloads/lists archives directly via HuggingFace API |
| |
| Requires env var: |
| ADMIN_API_URL - URL of the Cloudflare Worker admin API |
| """ |
|
|
| import os |
| import shutil |
| import tarfile |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import httpx |
| from fastapi import APIRouter, HTTPException, BackgroundTasks, Request |
|
|
| from config import DATA_DIR, ADMIN_API_URL, BACKUP_DIR |
| from storage import load_meta, save_meta, validate_zone_name |
|
|
| router = APIRouter(prefix="/api/backup", tags=["backup"]) |
|
|
| HF_API = "https://huggingface.co/api" |
|
|
|
|
| def _get_token(request: Request) -> str: |
| """Extract JWT token from request Authorization header.""" |
| auth = request.headers.get("Authorization", "") |
| if auth.startswith("Bearer "): |
| return auth[7:] |
| return "" |
|
|
|
|
| def _get_credentials(token: str) -> dict: |
| """Get HF credentials from Worker. Returns {hf_token, repo, path_prefix}.""" |
| with httpx.Client(timeout=15) as client: |
| resp = client.get( |
| f"{ADMIN_API_URL}/backup/credentials", |
| headers={"Authorization": f"Bearer {token}"}, |
| ) |
| if resp.status_code != 200: |
| data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {"error": resp.text} |
| raise ValueError(data.get("error", f"Worker error: {resp.status_code}")) |
| return resp.json() |
|
|
|
|
| def _log_action(token: str, zone_name: str, action: str, status: str, file_path: str = ""): |
| """Log backup/restore action to Worker (best-effort).""" |
| try: |
| with httpx.Client(timeout=10) as client: |
| client.post( |
| f"{ADMIN_API_URL}/backup/log", |
| headers={"Authorization": f"Bearer {token}"}, |
| json={"zone_name": zone_name, "action": action, "status": status, "file_path": file_path}, |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def _create_zone_archive(zone_name: str) -> Path: |
| """Create a tar.gz archive of a zone directory.""" |
| zone_path = DATA_DIR / zone_name |
| if not zone_path.is_dir(): |
| raise ValueError(f"Zone '{zone_name}' khong ton tai") |
|
|
| archive_path = BACKUP_DIR / f"{zone_name}.tar.gz" |
| with tarfile.open(archive_path, "w:gz") as tar: |
| tar.add(str(zone_path), arcname=zone_name) |
| return archive_path |
|
|
|
|
| _backup_status: dict = {"running": False, "last": None, "error": None, "progress": ""} |
|
|
|
|
| @router.get("/status") |
| def backup_status(): |
| return { |
| "configured": bool(ADMIN_API_URL), |
| "admin_url": ADMIN_API_URL or None, |
| "running": _backup_status["running"], |
| "last": _backup_status["last"], |
| "error": _backup_status["error"], |
| "progress": _backup_status["progress"], |
| } |
|
|
|
|
| @router.get("/list") |
| async def list_backups(request: Request): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| try: |
| creds = _get_credentials(token) |
| async with httpx.AsyncClient(timeout=30) as client: |
| resp = await client.get( |
| f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}", |
| headers={"Authorization": f"Bearer {creds['hf_token']}"}, |
| ) |
| if resp.status_code == 404: |
| return [] |
| if resp.status_code != 200: |
| raise HTTPException(502, f"HF API error: {resp.status_code} {resp.text}") |
| tree = resp.json() |
| meta = load_meta() |
| return [ |
| { |
| "zone_name": f["path"].split("/")[-1].replace(".tar.gz", ""), |
| "file": f["path"], |
| "size": (f.get("lfs") or {}).get("size") or f.get("size", 0), |
| "last_modified": (f.get("lastCommit") or {}).get("date", ""), |
| "local_exists": f["path"].split("/")[-1].replace(".tar.gz", "") in meta, |
| } |
| for f in tree |
| if f.get("type") == "file" and f["path"].endswith(".tar.gz") |
| ] |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
| except httpx.HTTPError as e: |
| raise HTTPException(502, f"Khong the ket noi: {e}") |
|
|
|
|
| def _upload_to_hf(creds: dict, zone_name: str, archive_path: Path): |
| """Upload archive directly to HuggingFace Dataset via huggingface_hub.""" |
| from huggingface_hub import HfApi |
|
|
| file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz" |
| api = HfApi(token=creds["hf_token"]) |
| api.upload_file( |
| path_or_fileobj=str(archive_path), |
| path_in_repo=file_path, |
| repo_id=creds["repo"], |
| repo_type="dataset", |
| commit_message=f"Backup zone: {zone_name}", |
| ) |
|
|
|
|
| @router.post("/zone/{zone_name}") |
| async def backup_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| try: |
| validate_zone_name(zone_name) |
| if not (DATA_DIR / zone_name).is_dir(): |
| raise ValueError(f"Zone '{zone_name}' khong ton tai") |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup khac dang chay") |
|
|
| |
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = f"Dang backup zone: {zone_name}..." |
| try: |
| archive_path = _create_zone_archive(zone_name) |
| try: |
| _upload_to_hf(creds, zone_name, archive_path) |
| finally: |
| archive_path.unlink(missing_ok=True) |
| _log_action(token, zone_name, "backup", "success", |
| f"{creds['path_prefix']}/{zone_name}.tar.gz") |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Backup zone {zone_name} thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi backup: {e}" |
| _log_action(token, zone_name, "backup", "error") |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": f"Dang backup zone {zone_name} trong nen..."} |
|
|
|
|
| @router.post("/all") |
| async def backup_all(request: Request, background_tasks: BackgroundTasks): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = "Dang backup tat ca zones..." |
| try: |
| meta = load_meta() |
| total = len(meta) |
| done = 0 |
| for zone_name in meta: |
| zone_path = DATA_DIR / zone_name |
| if not zone_path.is_dir(): |
| continue |
| _backup_status["progress"] = f"Dang backup zone {zone_name} ({done + 1}/{total})..." |
| archive_path = _create_zone_archive(zone_name) |
| try: |
| _upload_to_hf(creds, zone_name, archive_path) |
| finally: |
| archive_path.unlink(missing_ok=True) |
| _log_action(token, zone_name, "backup", "success", |
| f"{creds['path_prefix']}/{zone_name}.tar.gz") |
| done += 1 |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = "Backup tat ca zones thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi backup: {e}" |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": "Dang backup tat ca zones trong nen..."} |
|
|
|
|
| def _download_from_hf(creds: dict, zone_name: str) -> bytes: |
| """Download archive directly from HuggingFace Dataset.""" |
| file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz" |
| with httpx.Client(timeout=300, follow_redirects=True) as client: |
| resp = client.get( |
| f"https://huggingface.co/datasets/{creds['repo']}/resolve/main/{file_path}", |
| headers={"Authorization": f"Bearer {creds['hf_token']}"}, |
| ) |
| if resp.status_code == 404: |
| raise FileNotFoundError(f"Backup zone '{zone_name}' khong ton tai") |
| if resp.status_code != 200: |
| raise ValueError(f"HF download error: {resp.status_code}") |
| return resp.content |
|
|
|
|
| @router.post("/restore/{zone_name}") |
| async def restore_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| try: |
| validate_zone_name(zone_name) |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup/restore khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = f"Dang restore zone: {zone_name}..." |
| try: |
| data = _download_from_hf(creds, zone_name) |
| archive_path = BACKUP_DIR / f"{zone_name}.tar.gz" |
| archive_path.write_bytes(data) |
|
|
| try: |
| zone_path = DATA_DIR / zone_name |
| if zone_path.exists(): |
| shutil.rmtree(zone_path) |
| zone_path.mkdir(parents=True, exist_ok=True) |
| with tarfile.open(archive_path, "r:gz") as tar: |
| for member in tar.getmembers(): |
| member_path = os.path.normpath(member.name) |
| if member_path.startswith("..") or os.path.isabs(member_path): |
| raise ValueError(f"Archive chua path khong an toan: {member.name}") |
| if not member_path.startswith(zone_name): |
| raise ValueError(f"Archive chua path ngoai zone: {member.name}") |
| tar.extractall(path=str(DATA_DIR), filter="data") |
| meta = load_meta() |
| if zone_name not in meta: |
| meta[zone_name] = {"description": f"Restored from backup", "created": datetime.now().isoformat()} |
| save_meta(meta) |
| finally: |
| archive_path.unlink(missing_ok=True) |
|
|
| _log_action(token, zone_name, "restore", "success", |
| f"{creds['path_prefix']}/{zone_name}.tar.gz") |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Restore zone {zone_name} thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi restore: {e}" |
| _log_action(token, zone_name, "restore", "error") |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": f"Dang restore zone {zone_name} trong nen..."} |
|
|
|
|
| @router.post("/restore-all") |
| async def restore_all(request: Request, background_tasks: BackgroundTasks): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup/restore khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = "Dang restore tat ca zones..." |
| try: |
| |
| with httpx.Client(timeout=30) as client: |
| resp = client.get( |
| f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}", |
| headers={"Authorization": f"Bearer {creds['hf_token']}"}, |
| ) |
| if resp.status_code == 404: |
| _backup_status["progress"] = "Khong co backup nao" |
| return |
| if resp.status_code != 200: |
| raise ValueError(f"HF API error: {resp.status_code}") |
|
|
| tree = resp.json() |
| backup_files = [ |
| f for f in tree |
| if f.get("type") == "file" and f["path"].endswith(".tar.gz") |
| ] |
| total = len(backup_files) |
| done = 0 |
| for bf in backup_files: |
| zn = bf["path"].split("/")[-1].replace(".tar.gz", "") |
| _backup_status["progress"] = f"Dang restore zone {zn} ({done + 1}/{total})..." |
| try: |
| data = _download_from_hf(creds, zn) |
| except FileNotFoundError: |
| continue |
| archive_path = BACKUP_DIR / f"{zn}.tar.gz" |
| archive_path.write_bytes(data) |
|
|
| try: |
| zone_path = DATA_DIR / zn |
| if zone_path.exists(): |
| shutil.rmtree(zone_path) |
| zone_path = DATA_DIR / zn |
| if zone_path.exists(): |
| shutil.rmtree(zone_path) |
| zone_path.mkdir(parents=True, exist_ok=True) |
| with tarfile.open(archive_path, "r:gz") as tar: |
| for member in tar.getmembers(): |
| member_path = os.path.normpath(member.name) |
| if member_path.startswith("..") or os.path.isabs(member_path): |
| raise ValueError(f"Archive chua path khong an toan: {member.name}") |
| if not member_path.startswith(zn): |
| raise ValueError(f"Archive chua path ngoai zone: {member.name}") |
| tar.extractall(path=str(DATA_DIR), filter="data") |
| meta = load_meta() |
| if zn not in meta: |
| meta[zn] = {"description": "Restored from backup", "created": datetime.now().isoformat()} |
| save_meta(meta) |
| finally: |
| archive_path.unlink(missing_ok=True) |
| done += 1 |
|
|
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Restore {done}/{total} zones thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi restore: {e}" |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": "Dang restore tat ca zones trong nen..."} |
|
|