cc3m / routers /backup.py
kokokoasd's picture
Upload 20 files
162a424 verified
"""
Backup & Restore API — talks directly to HuggingFace Dataset API.
Flow:
1. Space asks Worker for HF credentials (token, repo, user path prefix)
2. Space uploads/downloads/lists archives directly via HuggingFace API
Requires env var:
ADMIN_API_URL - URL of the Cloudflare Worker admin API
"""
import os
import shutil
import tarfile
from datetime import datetime
from pathlib import Path
import httpx
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request
from config import DATA_DIR, ADMIN_API_URL, BACKUP_DIR
from storage import load_meta, save_meta, validate_zone_name
router = APIRouter(prefix="/api/backup", tags=["backup"])
HF_API = "https://huggingface.co/api"
def _get_token(request: Request) -> str:
"""Extract JWT token from request Authorization header."""
auth = request.headers.get("Authorization", "")
if auth.startswith("Bearer "):
return auth[7:]
return ""
def _get_credentials(token: str) -> dict:
"""Get HF credentials from Worker. Returns {hf_token, repo, path_prefix}."""
with httpx.Client(timeout=15) as client:
resp = client.get(
f"{ADMIN_API_URL}/backup/credentials",
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code != 200:
data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {"error": resp.text}
raise ValueError(data.get("error", f"Worker error: {resp.status_code}"))
return resp.json()
def _log_action(token: str, zone_name: str, action: str, status: str, file_path: str = ""):
"""Log backup/restore action to Worker (best-effort)."""
try:
with httpx.Client(timeout=10) as client:
client.post(
f"{ADMIN_API_URL}/backup/log",
headers={"Authorization": f"Bearer {token}"},
json={"zone_name": zone_name, "action": action, "status": status, "file_path": file_path},
)
except Exception:
pass
def _create_zone_archive(zone_name: str) -> Path:
"""Create a tar.gz archive of a zone directory."""
zone_path = DATA_DIR / zone_name
if not zone_path.is_dir():
raise ValueError(f"Zone '{zone_name}' khong ton tai")
archive_path = BACKUP_DIR / f"{zone_name}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
tar.add(str(zone_path), arcname=zone_name)
return archive_path
_backup_status: dict = {"running": False, "last": None, "error": None, "progress": ""}
@router.get("/status")
def backup_status():
return {
"configured": bool(ADMIN_API_URL),
"admin_url": ADMIN_API_URL or None,
"running": _backup_status["running"],
"last": _backup_status["last"],
"error": _backup_status["error"],
"progress": _backup_status["progress"],
}
@router.get("/list")
async def list_backups(request: Request):
if not ADMIN_API_URL:
raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh")
token = _get_token(request)
if not token:
raise HTTPException(401, "Chua dang nhap")
try:
creds = _get_credentials(token)
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(
f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}",
headers={"Authorization": f"Bearer {creds['hf_token']}"},
)
if resp.status_code == 404:
return []
if resp.status_code != 200:
raise HTTPException(502, f"HF API error: {resp.status_code} {resp.text}")
tree = resp.json()
meta = load_meta()
return [
{
"zone_name": f["path"].split("/")[-1].replace(".tar.gz", ""),
"file": f["path"],
"size": (f.get("lfs") or {}).get("size") or f.get("size", 0),
"last_modified": (f.get("lastCommit") or {}).get("date", ""),
"local_exists": f["path"].split("/")[-1].replace(".tar.gz", "") in meta,
}
for f in tree
if f.get("type") == "file" and f["path"].endswith(".tar.gz")
]
except ValueError as e:
raise HTTPException(502, str(e))
except httpx.HTTPError as e:
raise HTTPException(502, f"Khong the ket noi: {e}")
def _upload_to_hf(creds: dict, zone_name: str, archive_path: Path):
"""Upload archive directly to HuggingFace Dataset via huggingface_hub."""
from huggingface_hub import HfApi
file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz"
api = HfApi(token=creds["hf_token"])
api.upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=file_path,
repo_id=creds["repo"],
repo_type="dataset",
commit_message=f"Backup zone: {zone_name}",
)
@router.post("/zone/{zone_name}")
async def backup_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks):
if not ADMIN_API_URL:
raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh")
token = _get_token(request)
if not token:
raise HTTPException(401, "Chua dang nhap")
try:
validate_zone_name(zone_name)
if not (DATA_DIR / zone_name).is_dir():
raise ValueError(f"Zone '{zone_name}' khong ton tai")
except ValueError as e:
raise HTTPException(400, str(e))
if _backup_status["running"]:
raise HTTPException(409, "Dang co backup khac dang chay")
# Fetch credentials before background task (validates JWT now)
try:
creds = _get_credentials(token)
except ValueError as e:
raise HTTPException(502, str(e))
def _run():
_backup_status["running"] = True
_backup_status["error"] = None
_backup_status["progress"] = f"Dang backup zone: {zone_name}..."
try:
archive_path = _create_zone_archive(zone_name)
try:
_upload_to_hf(creds, zone_name, archive_path)
finally:
archive_path.unlink(missing_ok=True)
_log_action(token, zone_name, "backup", "success",
f"{creds['path_prefix']}/{zone_name}.tar.gz")
_backup_status["last"] = datetime.now().isoformat()
_backup_status["progress"] = f"Backup zone {zone_name} thanh cong"
except Exception as e:
_backup_status["error"] = str(e)
_backup_status["progress"] = f"Loi backup: {e}"
_log_action(token, zone_name, "backup", "error")
finally:
_backup_status["running"] = False
background_tasks.add_task(_run)
return {"ok": True, "message": f"Dang backup zone {zone_name} trong nen..."}
@router.post("/all")
async def backup_all(request: Request, background_tasks: BackgroundTasks):
if not ADMIN_API_URL:
raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh")
token = _get_token(request)
if not token:
raise HTTPException(401, "Chua dang nhap")
if _backup_status["running"]:
raise HTTPException(409, "Dang co backup khac dang chay")
try:
creds = _get_credentials(token)
except ValueError as e:
raise HTTPException(502, str(e))
def _run():
_backup_status["running"] = True
_backup_status["error"] = None
_backup_status["progress"] = "Dang backup tat ca zones..."
try:
meta = load_meta()
total = len(meta)
done = 0
for zone_name in meta:
zone_path = DATA_DIR / zone_name
if not zone_path.is_dir():
continue
_backup_status["progress"] = f"Dang backup zone {zone_name} ({done + 1}/{total})..."
archive_path = _create_zone_archive(zone_name)
try:
_upload_to_hf(creds, zone_name, archive_path)
finally:
archive_path.unlink(missing_ok=True)
_log_action(token, zone_name, "backup", "success",
f"{creds['path_prefix']}/{zone_name}.tar.gz")
done += 1
_backup_status["last"] = datetime.now().isoformat()
_backup_status["progress"] = "Backup tat ca zones thanh cong"
except Exception as e:
_backup_status["error"] = str(e)
_backup_status["progress"] = f"Loi backup: {e}"
finally:
_backup_status["running"] = False
background_tasks.add_task(_run)
return {"ok": True, "message": "Dang backup tat ca zones trong nen..."}
def _download_from_hf(creds: dict, zone_name: str) -> bytes:
"""Download archive directly from HuggingFace Dataset."""
file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz"
with httpx.Client(timeout=300, follow_redirects=True) as client:
resp = client.get(
f"https://huggingface.co/datasets/{creds['repo']}/resolve/main/{file_path}",
headers={"Authorization": f"Bearer {creds['hf_token']}"},
)
if resp.status_code == 404:
raise FileNotFoundError(f"Backup zone '{zone_name}' khong ton tai")
if resp.status_code != 200:
raise ValueError(f"HF download error: {resp.status_code}")
return resp.content
@router.post("/restore/{zone_name}")
async def restore_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks):
if not ADMIN_API_URL:
raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh")
token = _get_token(request)
if not token:
raise HTTPException(401, "Chua dang nhap")
try:
validate_zone_name(zone_name)
except ValueError as e:
raise HTTPException(400, str(e))
if _backup_status["running"]:
raise HTTPException(409, "Dang co backup/restore khac dang chay")
try:
creds = _get_credentials(token)
except ValueError as e:
raise HTTPException(502, str(e))
def _run():
_backup_status["running"] = True
_backup_status["error"] = None
_backup_status["progress"] = f"Dang restore zone: {zone_name}..."
try:
data = _download_from_hf(creds, zone_name)
archive_path = BACKUP_DIR / f"{zone_name}.tar.gz"
archive_path.write_bytes(data)
try:
zone_path = DATA_DIR / zone_name
if zone_path.exists():
shutil.rmtree(zone_path)
zone_path.mkdir(parents=True, exist_ok=True)
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
member_path = os.path.normpath(member.name)
if member_path.startswith("..") or os.path.isabs(member_path):
raise ValueError(f"Archive chua path khong an toan: {member.name}")
if not member_path.startswith(zone_name):
raise ValueError(f"Archive chua path ngoai zone: {member.name}")
tar.extractall(path=str(DATA_DIR), filter="data")
meta = load_meta()
if zone_name not in meta:
meta[zone_name] = {"description": f"Restored from backup", "created": datetime.now().isoformat()}
save_meta(meta)
finally:
archive_path.unlink(missing_ok=True)
_log_action(token, zone_name, "restore", "success",
f"{creds['path_prefix']}/{zone_name}.tar.gz")
_backup_status["last"] = datetime.now().isoformat()
_backup_status["progress"] = f"Restore zone {zone_name} thanh cong"
except Exception as e:
_backup_status["error"] = str(e)
_backup_status["progress"] = f"Loi restore: {e}"
_log_action(token, zone_name, "restore", "error")
finally:
_backup_status["running"] = False
background_tasks.add_task(_run)
return {"ok": True, "message": f"Dang restore zone {zone_name} trong nen..."}
@router.post("/restore-all")
async def restore_all(request: Request, background_tasks: BackgroundTasks):
if not ADMIN_API_URL:
raise HTTPException(400, "ADMIN_API_URL chua duoc cau hinh")
token = _get_token(request)
if not token:
raise HTTPException(401, "Chua dang nhap")
if _backup_status["running"]:
raise HTTPException(409, "Dang co backup/restore khac dang chay")
try:
creds = _get_credentials(token)
except ValueError as e:
raise HTTPException(502, str(e))
def _run():
_backup_status["running"] = True
_backup_status["error"] = None
_backup_status["progress"] = "Dang restore tat ca zones..."
try:
# List backups from HF directly
with httpx.Client(timeout=30) as client:
resp = client.get(
f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}",
headers={"Authorization": f"Bearer {creds['hf_token']}"},
)
if resp.status_code == 404:
_backup_status["progress"] = "Khong co backup nao"
return
if resp.status_code != 200:
raise ValueError(f"HF API error: {resp.status_code}")
tree = resp.json()
backup_files = [
f for f in tree
if f.get("type") == "file" and f["path"].endswith(".tar.gz")
]
total = len(backup_files)
done = 0
for bf in backup_files:
zn = bf["path"].split("/")[-1].replace(".tar.gz", "")
_backup_status["progress"] = f"Dang restore zone {zn} ({done + 1}/{total})..."
try:
data = _download_from_hf(creds, zn)
except FileNotFoundError:
continue
archive_path = BACKUP_DIR / f"{zn}.tar.gz"
archive_path.write_bytes(data)
try:
zone_path = DATA_DIR / zn
if zone_path.exists():
shutil.rmtree(zone_path)
zone_path = DATA_DIR / zn
if zone_path.exists():
shutil.rmtree(zone_path)
zone_path.mkdir(parents=True, exist_ok=True)
with tarfile.open(archive_path, "r:gz") as tar:
for member in tar.getmembers():
member_path = os.path.normpath(member.name)
if member_path.startswith("..") or os.path.isabs(member_path):
raise ValueError(f"Archive chua path khong an toan: {member.name}")
if not member_path.startswith(zn):
raise ValueError(f"Archive chua path ngoai zone: {member.name}")
tar.extractall(path=str(DATA_DIR), filter="data")
meta = load_meta()
if zn not in meta:
meta[zn] = {"description": "Restored from backup", "created": datetime.now().isoformat()}
save_meta(meta)
finally:
archive_path.unlink(missing_ok=True)
done += 1
_backup_status["last"] = datetime.now().isoformat()
_backup_status["progress"] = f"Restore {done}/{total} zones thanh cong"
except Exception as e:
_backup_status["error"] = str(e)
_backup_status["progress"] = f"Loi restore: {e}"
finally:
_backup_status["running"] = False
background_tasks.add_task(_run)
return {"ok": True, "message": "Dang restore tat ca zones trong nen..."}