| """ |
| Backup & Restore API — talks directly to HuggingFace Dataset API. |
| |
| Flow: |
| 1. Space asks Worker for HF credentials (token, repo, user path prefix) |
| 2. Space uploads/downloads/lists archives directly via HuggingFace API |
| """ |
|
|
| import os |
| import shutil |
| import tarfile |
| import tempfile |
| from datetime import datetime |
| from pathlib import Path |
|
|
| import httpx |
| from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request |
|
|
| from auth import AuthUser, get_current_user |
| from config import ADMIN_API_URL, BACKUP_DIR, DATA_DIR |
| from routers.terminal import kill_terminal |
| from storage import check_zone_owner, load_meta, save_meta, validate_zone_name |
|
|
| router = APIRouter(prefix="/api/backup", tags=["backup"]) |
|
|
| HF_API = "https://huggingface.co/api" |
| _backup_status: dict = {"running": False, "last": None, "error": None, "progress": ""} |
|
|
|
|
| def _utc_stamp() -> str: |
| return datetime.utcnow().strftime("%Y%m%dT%H%M%SZ") |
|
|
|
|
| def _archive_file_name(zone_name: str) -> str: |
| return f"{zone_name}__{_utc_stamp()}.tar.gz" |
|
|
|
|
| def _zone_from_backup_name(name: str) -> str: |
| base = Path(name).name |
| if not base.endswith(".tar.gz"): |
| return base |
| stem = base[:-7] |
| return stem.split("__", 1)[0] |
|
|
|
|
| def _backup_sort_key(item: dict) -> tuple[str, str]: |
| return (item.get("last_modified") or "", item.get("backup_name") or "") |
|
|
|
|
| def _get_token(request: Request) -> str: |
| auth = request.headers.get("Authorization", "") |
| if auth.startswith("Bearer "): |
| return auth[7:] |
| return "" |
|
|
|
|
| def _get_credentials(token: str) -> dict: |
| with httpx.Client(timeout=15) as client: |
| resp = client.get( |
| f"{ADMIN_API_URL}/backup/credentials", |
| headers={"Authorization": f"Bearer {token}"}, |
| ) |
| if resp.status_code != 200: |
| data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {"error": resp.text} |
| raise ValueError(data.get("error", f"Worker error: {resp.status_code}")) |
| return resp.json() |
|
|
|
|
| def _log_action(token: str, zone_name: str, action: str, status: str, file_path: str = ""): |
| try: |
| with httpx.Client(timeout=10) as client: |
| client.post( |
| f"{ADMIN_API_URL}/backup/log", |
| headers={"Authorization": f"Bearer {token}"}, |
| json={"zone_name": zone_name, "action": action, "status": status, "file_path": file_path}, |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def _create_zone_archive(zone_name: str) -> tuple[Path, str]: |
| zone_path = DATA_DIR / zone_name |
| if not zone_path.is_dir(): |
| raise ValueError(f"Zone '{zone_name}' khong ton tai") |
|
|
| fd, temp_path = tempfile.mkstemp(prefix=f"{zone_name}-", suffix=".tar.gz", dir=str(BACKUP_DIR)) |
| os.close(fd) |
| archive_path = Path(temp_path) |
| with tarfile.open(archive_path, "w:gz") as tar: |
| tar.add(str(zone_path), arcname=zone_name) |
| return archive_path, _archive_file_name(zone_name) |
|
|
|
|
| def _assert_restore_allowed(zone_name: str, user: AuthUser): |
| meta = load_meta() |
| if zone_name in meta: |
| check_zone_owner(zone_name, user.sub, user.role) |
|
|
|
|
| def _extract_archive(archive_path: Path, zone_name: str): |
| zone_path = DATA_DIR / zone_name |
| if zone_path.exists(): |
| shutil.rmtree(zone_path) |
| zone_path.mkdir(parents=True, exist_ok=True) |
|
|
| with tarfile.open(archive_path, "r:gz") as tar: |
| for member in tar.getmembers(): |
| member_path = os.path.normpath(member.name) |
| if member_path.startswith("..") or os.path.isabs(member_path): |
| raise ValueError(f"Archive chua path khong an toan: {member.name}") |
| if member_path != zone_name and not member_path.startswith(f"{zone_name}/"): |
| raise ValueError(f"Archive chua path ngoai zone: {member.name}") |
| tar.extractall(path=str(DATA_DIR), filter="data") |
|
|
|
|
| def _ensure_restored_meta(zone_name: str, user: AuthUser): |
| meta = load_meta() |
| if zone_name not in meta: |
| meta[zone_name] = { |
| "description": "Restored from backup", |
| "created": datetime.now().isoformat(), |
| "owner_id": user.sub, |
| "owner_name": user.username, |
| } |
| save_meta(meta) |
|
|
|
|
| def _download_from_hf(creds: dict, backup_name: str) -> bytes: |
| file_path = f"{creds['path_prefix']}/{backup_name}" |
| with httpx.Client(timeout=300, follow_redirects=True) as client: |
| resp = client.get( |
| f"https://huggingface.co/datasets/{creds['repo']}/resolve/main/{file_path}", |
| headers={"Authorization": f"Bearer {creds['hf_token']}"}, |
| ) |
| if resp.status_code == 404: |
| raise FileNotFoundError(f"Backup '{backup_name}' khong ton tai") |
| if resp.status_code != 200: |
| raise ValueError(f"HF download error: {resp.status_code}") |
| return resp.content |
|
|
|
|
| def _list_backups_from_hf(creds: dict) -> list[dict]: |
| with httpx.Client(timeout=30) as client: |
| resp = client.get( |
| f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}", |
| headers={"Authorization": f"Bearer {creds['hf_token']}"}, |
| ) |
| if resp.status_code == 404: |
| return [] |
| if resp.status_code != 200: |
| raise ValueError(f"HF API error: {resp.status_code} {resp.text}") |
|
|
| meta = load_meta() |
| items: list[dict] = [] |
| for entry in resp.json(): |
| path = entry.get("path", "") |
| if entry.get("type") != "file" or not path.endswith(".tar.gz"): |
| continue |
| backup_name = path.split("/")[-1] |
| zone_name = _zone_from_backup_name(backup_name) |
| items.append( |
| { |
| "zone_name": zone_name, |
| "backup_name": backup_name, |
| "file": path, |
| "size": (entry.get("lfs") or {}).get("size") or entry.get("size", 0), |
| "last_modified": (entry.get("lastCommit") or {}).get("date", ""), |
| "local_exists": zone_name in meta, |
| } |
| ) |
| return sorted(items, key=_backup_sort_key, reverse=True) |
|
|
|
|
| def _delete_backup_from_hf(creds: dict, backup_name: str): |
| from huggingface_hub import HfApi |
|
|
| api = HfApi(token=creds["hf_token"]) |
| api.delete_file( |
| path_in_repo=f"{creds['path_prefix']}/{backup_name}", |
| repo_id=creds["repo"], |
| repo_type="dataset", |
| commit_message=f"Delete backup: {backup_name}", |
| ) |
|
|
|
|
| def _upload_to_hf(creds: dict, backup_name: str, archive_path: Path): |
| from huggingface_hub import HfApi |
|
|
| api = HfApi(token=creds["hf_token"]) |
| api.upload_file( |
| path_or_fileobj=str(archive_path), |
| path_in_repo=f"{creds['path_prefix']}/{backup_name}", |
| repo_id=creds["repo"], |
| repo_type="dataset", |
| commit_message=f"Backup: {backup_name}", |
| ) |
|
|
|
|
| @router.get("/status") |
| def backup_status(): |
| return { |
| "configured": bool(ADMIN_API_URL), |
| "admin_url": ADMIN_API_URL or None, |
| "running": _backup_status["running"], |
| "last": _backup_status["last"], |
| "error": _backup_status["error"], |
| "progress": _backup_status["progress"], |
| } |
|
|
|
|
| @router.get("/list") |
| async def list_backups(request: Request, user: AuthUser = Depends(get_current_user)): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
|
|
| try: |
| creds = _get_credentials(token) |
| backups = _list_backups_from_hf(creds) |
| if user.role == "admin": |
| return backups |
|
|
| meta = load_meta() |
| allowed = {name for name, info in meta.items() if info.get("owner_id") == user.sub} |
| return [item for item in backups if item["zone_name"] in allowed or not item["local_exists"]] |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
| except httpx.HTTPError as e: |
| raise HTTPException(502, f"Khong the ket noi: {e}") |
|
|
|
|
| @router.delete("/file") |
| async def delete_backup_file( |
| request: Request, |
| backup_name: str = Query(...), |
| user: AuthUser = Depends(get_current_user), |
| ): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| zone_name = _zone_from_backup_name(backup_name) |
| try: |
| _assert_restore_allowed(zone_name, user) |
| creds = _get_credentials(_get_token(request)) |
| _delete_backup_from_hf(creds, backup_name) |
| _log_action(_get_token(request), zone_name, "delete_backup", "success", backup_name) |
| return {"ok": True} |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
| except Exception as e: |
| raise HTTPException(502, str(e)) |
|
|
|
|
| @router.post("/zone/{zone_name}") |
| async def backup_zone( |
| zone_name: str, |
| request: Request, |
| background_tasks: BackgroundTasks, |
| user: AuthUser = Depends(get_current_user), |
| ): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
|
|
| try: |
| validate_zone_name(zone_name) |
| check_zone_owner(zone_name, user.sub, user.role) |
| if not (DATA_DIR / zone_name).is_dir(): |
| raise ValueError(f"Zone '{zone_name}' khong ton tai") |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = f"Dang backup zone: {zone_name}..." |
| archive_path: Path | None = None |
| backup_name = "" |
| try: |
| archive_path, backup_name = _create_zone_archive(zone_name) |
| _upload_to_hf(creds, backup_name, archive_path) |
| _log_action(token, zone_name, "backup", "success", f"{creds['path_prefix']}/{backup_name}") |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Backup zone {zone_name} thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi backup: {e}" |
| _log_action(token, zone_name, "backup", "error", backup_name) |
| finally: |
| if archive_path: |
| archive_path.unlink(missing_ok=True) |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": f"Dang backup zone {zone_name} trong nen..."} |
|
|
|
|
| @router.post("/all") |
| async def backup_all( |
| request: Request, |
| background_tasks: BackgroundTasks, |
| user: AuthUser = Depends(get_current_user), |
| ): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| meta = load_meta() |
| zone_names = [ |
| name for name, info in meta.items() |
| if (DATA_DIR / name).is_dir() and (user.role == "admin" or info.get("owner_id") == user.sub) |
| ] |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = "Dang backup tat ca zones..." |
| try: |
| total = len(zone_names) |
| for idx, zone_name in enumerate(zone_names, start=1): |
| _backup_status["progress"] = f"Dang backup zone {zone_name} ({idx}/{total})..." |
| archive_path, backup_name = _create_zone_archive(zone_name) |
| try: |
| _upload_to_hf(creds, backup_name, archive_path) |
| finally: |
| archive_path.unlink(missing_ok=True) |
| _log_action(token, zone_name, "backup", "success", f"{creds['path_prefix']}/{backup_name}") |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = "Backup tat ca zones thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi backup: {e}" |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": "Dang backup tat ca zones trong nen..."} |
|
|
|
|
| @router.post("/restore/{zone_name}") |
| async def restore_zone( |
| zone_name: str, |
| request: Request, |
| background_tasks: BackgroundTasks, |
| backup_name: str | None = Query(None), |
| user: AuthUser = Depends(get_current_user), |
| ): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
|
|
| try: |
| validate_zone_name(zone_name) |
| _assert_restore_allowed(zone_name, user) |
| except ValueError as e: |
| raise HTTPException(400, str(e)) |
|
|
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup/restore khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| target_backup_name = backup_name |
| if not target_backup_name: |
| backups = [item for item in _list_backups_from_hf(creds) if item["zone_name"] == zone_name] |
| if not backups: |
| raise ValueError(f"Khong tim thay backup cho zone '{zone_name}'") |
| target_backup_name = backups[0]["backup_name"] |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = f"Dang restore zone: {zone_name}..." |
| archive_path: Path | None = None |
| try: |
| data = _download_from_hf(creds, target_backup_name) |
| fd, temp_path = tempfile.mkstemp(prefix=f"restore-{zone_name}-", suffix=".tar.gz", dir=str(BACKUP_DIR)) |
| os.close(fd) |
| archive_path = Path(temp_path) |
| archive_path.write_bytes(data) |
| kill_terminal(zone_name) |
| _extract_archive(archive_path, zone_name) |
| _ensure_restored_meta(zone_name, user) |
| _log_action(token, zone_name, "restore", "success", f"{creds['path_prefix']}/{target_backup_name}") |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Restore zone {zone_name} thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi restore: {e}" |
| _log_action(token, zone_name, "restore", "error", target_backup_name or "") |
| finally: |
| if archive_path: |
| archive_path.unlink(missing_ok=True) |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": f"Dang restore zone {zone_name} trong nen..."} |
|
|
|
|
| @router.post("/restore-all") |
| async def restore_all( |
| request: Request, |
| background_tasks: BackgroundTasks, |
| user: AuthUser = Depends(get_current_user), |
| ): |
| if not ADMIN_API_URL: |
| raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh") |
|
|
| token = _get_token(request) |
| if not token: |
| raise HTTPException(401, "Chua dang nhap") |
| if _backup_status["running"]: |
| raise HTTPException(409, "Dang co backup/restore khac dang chay") |
|
|
| try: |
| creds = _get_credentials(token) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| try: |
| backups = _list_backups_from_hf(creds) |
| except ValueError as e: |
| raise HTTPException(502, str(e)) |
|
|
| latest_by_zone: dict[str, dict] = {} |
| for item in backups: |
| zone_name = item["zone_name"] |
| try: |
| _assert_restore_allowed(zone_name, user) |
| except ValueError: |
| continue |
| latest_by_zone.setdefault(zone_name, item) |
|
|
| def _run(): |
| _backup_status["running"] = True |
| _backup_status["error"] = None |
| _backup_status["progress"] = "Dang restore tat ca zones..." |
| try: |
| items = list(latest_by_zone.values()) |
| total = len(items) |
| done = 0 |
| for idx, item in enumerate(items, start=1): |
| zone_name = item["zone_name"] |
| backup_name = item["backup_name"] |
| _backup_status["progress"] = f"Dang restore zone {zone_name} ({idx}/{total})..." |
| archive_path: Path | None = None |
| try: |
| data = _download_from_hf(creds, backup_name) |
| fd, temp_path = tempfile.mkstemp(prefix=f"restore-{zone_name}-", suffix=".tar.gz", dir=str(BACKUP_DIR)) |
| os.close(fd) |
| archive_path = Path(temp_path) |
| archive_path.write_bytes(data) |
| kill_terminal(zone_name) |
| _extract_archive(archive_path, zone_name) |
| _ensure_restored_meta(zone_name, user) |
| done += 1 |
| finally: |
| if archive_path: |
| archive_path.unlink(missing_ok=True) |
| _backup_status["last"] = datetime.now().isoformat() |
| _backup_status["progress"] = f"Restore {done}/{total} zones thanh cong" |
| except Exception as e: |
| _backup_status["error"] = str(e) |
| _backup_status["progress"] = f"Loi restore: {e}" |
| finally: |
| _backup_status["running"] = False |
|
|
| background_tasks.add_task(_run) |
| return {"ok": True, "message": "Dang restore tat ca zones trong nen..."} |
|
|