kokokoasd commited on
Commit
cbc776e
·
verified ·
1 Parent(s): da9f558

Upload 20 files

Browse files
Files changed (1) hide show
  1. routers/backup.py +149 -63
routers/backup.py CHANGED
@@ -1,9 +1,9 @@
1
  """
2
- Backup & Restore API - proxies through the Admin Worker (Cloudflare Worker).
3
 
4
- The Admin Worker handles HuggingFace Dataset storage with per-user folders.
5
- This router creates local tar.gz archives and sends them to the Worker,
6
- or downloads archives from the Worker and extracts them locally.
7
 
8
  Requires env var:
9
  ADMIN_API_URL - URL of the Cloudflare Worker admin API
@@ -23,15 +23,7 @@ from storage import load_meta, save_meta, validate_zone_name
23
 
24
  router = APIRouter(prefix="/api/backup", tags=["backup"])
25
 
26
-
27
- def _worker_headers(token: str) -> dict:
28
- """Build headers for Worker API calls."""
29
- return {"Authorization": f"Bearer {token}"}
30
-
31
-
32
- def _url(path: str) -> str:
33
- """Build the full Worker URL for a given path."""
34
- return f"{ADMIN_API_URL}{path}"
35
 
36
 
37
  def _get_token(request: Request) -> str:
@@ -42,6 +34,32 @@ def _get_token(request: Request) -> str:
42
  return ""
43
 
44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  def _create_zone_archive(zone_name: str) -> Path:
46
  """Create a tar.gz archive of a zone directory."""
47
  zone_path = DATA_DIR / zone_name
@@ -77,14 +95,47 @@ async def list_backups(request: Request):
77
  if not token:
78
  raise HTTPException(401, "Chua dang nhap")
79
  try:
 
80
  async with httpx.AsyncClient(timeout=30) as client:
81
- resp = await client.get(_url("/backup/list"), headers=_worker_headers(token))
 
 
 
 
 
82
  if resp.status_code != 200:
83
- data = resp.json() if resp.headers.get("content-type", "").startswith("application/json") else {"error": resp.text}
84
- raise HTTPException(resp.status_code, data.get("error", "Worker error"))
85
- return resp.json()
 
 
 
 
 
 
 
 
 
 
86
  except httpx.HTTPError as e:
87
- raise HTTPException(502, f"Khong the ket noi Worker: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
 
89
 
90
  @router.post("/zone/{zone_name}")
@@ -103,6 +154,12 @@ async def backup_zone(zone_name: str, request: Request, background_tasks: Backgr
103
  if _backup_status["running"]:
104
  raise HTTPException(409, "Dang co backup khac dang chay")
105
 
 
 
 
 
 
 
106
  def _run():
107
  _backup_status["running"] = True
108
  _backup_status["error"] = None
@@ -110,22 +167,17 @@ async def backup_zone(zone_name: str, request: Request, background_tasks: Backgr
110
  try:
111
  archive_path = _create_zone_archive(zone_name)
112
  try:
113
- with httpx.Client(timeout=300) as client:
114
- with open(archive_path, "rb") as f:
115
- resp = client.post(
116
- _url(f"/backup/upload/{zone_name}"),
117
- headers={**_worker_headers(token), "Content-Type": "application/octet-stream"},
118
- content=f.read(),
119
- )
120
- if resp.status_code != 200:
121
- raise ValueError(f"Worker error: {resp.text}")
122
  finally:
123
  archive_path.unlink(missing_ok=True)
 
 
124
  _backup_status["last"] = datetime.now().isoformat()
125
  _backup_status["progress"] = f"Backup zone {zone_name} thanh cong"
126
  except Exception as e:
127
  _backup_status["error"] = str(e)
128
  _backup_status["progress"] = f"Loi backup: {e}"
 
129
  finally:
130
  _backup_status["running"] = False
131
 
@@ -143,6 +195,11 @@ async def backup_all(request: Request, background_tasks: BackgroundTasks):
143
  if _backup_status["running"]:
144
  raise HTTPException(409, "Dang co backup khac dang chay")
145
 
 
 
 
 
 
146
  def _run():
147
  _backup_status["running"] = True
148
  _backup_status["error"] = None
@@ -158,17 +215,11 @@ async def backup_all(request: Request, background_tasks: BackgroundTasks):
158
  _backup_status["progress"] = f"Dang backup zone {zone_name} ({done + 1}/{total})..."
159
  archive_path = _create_zone_archive(zone_name)
160
  try:
161
- with httpx.Client(timeout=300) as client:
162
- with open(archive_path, "rb") as f:
163
- resp = client.post(
164
- _url(f"/backup/upload/{zone_name}"),
165
- headers={**_worker_headers(token), "Content-Type": "application/octet-stream"},
166
- content=f.read(),
167
- )
168
- if resp.status_code != 200:
169
- raise ValueError(f"Worker error for {zone_name}: {resp.text}")
170
  finally:
171
  archive_path.unlink(missing_ok=True)
 
 
172
  done += 1
173
  _backup_status["last"] = datetime.now().isoformat()
174
  _backup_status["progress"] = "Backup tat ca zones thanh cong"
@@ -182,6 +233,22 @@ async def backup_all(request: Request, background_tasks: BackgroundTasks):
182
  return {"ok": True, "message": "Dang backup tat ca zones trong nen..."}
183
 
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  @router.post("/restore/{zone_name}")
186
  async def restore_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks):
187
  if not ADMIN_API_URL:
@@ -196,19 +263,19 @@ async def restore_zone(zone_name: str, request: Request, background_tasks: Backg
196
  if _backup_status["running"]:
197
  raise HTTPException(409, "Dang co backup/restore khac dang chay")
198
 
 
 
 
 
 
199
  def _run():
200
  _backup_status["running"] = True
201
  _backup_status["error"] = None
202
  _backup_status["progress"] = f"Dang restore zone: {zone_name}..."
203
  try:
204
- with httpx.Client(timeout=300) as client:
205
- resp = client.get(_url(f"/backup/download/{zone_name}"), headers=_worker_headers(token))
206
- if resp.status_code == 404:
207
- raise ValueError(f"Backup zone '{zone_name}' khong ton tai")
208
- if resp.status_code != 200:
209
- raise ValueError(f"Worker error: {resp.text}")
210
- archive_path = BACKUP_DIR / f"{zone_name}.tar.gz"
211
- archive_path.write_bytes(resp.content)
212
 
213
  try:
214
  zone_path = DATA_DIR / zone_name
@@ -229,11 +296,14 @@ async def restore_zone(zone_name: str, request: Request, background_tasks: Backg
229
  finally:
230
  archive_path.unlink(missing_ok=True)
231
 
 
 
232
  _backup_status["last"] = datetime.now().isoformat()
233
  _backup_status["progress"] = f"Restore zone {zone_name} thanh cong"
234
  except Exception as e:
235
  _backup_status["error"] = str(e)
236
  _backup_status["progress"] = f"Loi restore: {e}"
 
237
  finally:
238
  _backup_status["running"] = False
239
 
@@ -251,31 +321,47 @@ async def restore_all(request: Request, background_tasks: BackgroundTasks):
251
  if _backup_status["running"]:
252
  raise HTTPException(409, "Dang co backup/restore khac dang chay")
253
 
 
 
 
 
 
254
  def _run():
255
  _backup_status["running"] = True
256
  _backup_status["error"] = None
257
  _backup_status["progress"] = "Dang restore tat ca zones..."
258
  try:
 
259
  with httpx.Client(timeout=30) as client:
260
- resp = client.get(_url("/backup/list"), headers=_worker_headers(token))
261
- if resp.status_code != 200:
262
- raise ValueError(f"Khong the lay danh sach backup: {resp.text}")
263
- backup_list = resp.json()
264
-
265
- total = len(backup_list)
 
 
 
 
 
 
 
 
 
 
266
  done = 0
267
- for b in backup_list:
268
- zone_name = b["zone_name"]
269
- _backup_status["progress"] = f"Dang restore zone {zone_name} ({done + 1}/{total})..."
270
- with httpx.Client(timeout=300) as client:
271
- resp = client.get(_url(f"/backup/download/{zone_name}"), headers=_worker_headers(token))
272
- if resp.status_code != 200:
273
- continue
274
- archive_path = BACKUP_DIR / f"{zone_name}.tar.gz"
275
- archive_path.write_bytes(resp.content)
276
 
277
  try:
278
- zone_path = DATA_DIR / zone_name
279
  if zone_path.exists():
280
  shutil.rmtree(zone_path)
281
  with tarfile.open(archive_path, "r:gz") as tar:
@@ -283,12 +369,12 @@ async def restore_all(request: Request, background_tasks: BackgroundTasks):
283
  member_path = os.path.normpath(member.name)
284
  if member_path.startswith("..") or os.path.isabs(member_path):
285
  raise ValueError(f"Archive chua path khong an toan: {member.name}")
286
- if not member_path.startswith(zone_name):
287
  raise ValueError(f"Archive chua path ngoai zone: {member.name}")
288
  tar.extractall(path=str(DATA_DIR), filter="data")
289
  meta = load_meta()
290
- if zone_name not in meta:
291
- meta[zone_name] = {"description": "", "created": datetime.now().isoformat()}
292
  save_meta(meta)
293
  finally:
294
  archive_path.unlink(missing_ok=True)
 
1
  """
2
+ Backup & Restore API talks directly to HuggingFace Dataset API.
3
 
4
+ Flow:
5
+ 1. Space asks Worker for HF credentials (token, repo, user path prefix)
6
+ 2. Space uploads/downloads/lists archives directly via HuggingFace API
7
 
8
  Requires env var:
9
  ADMIN_API_URL - URL of the Cloudflare Worker admin API
 
23
 
24
  router = APIRouter(prefix="/api/backup", tags=["backup"])
25
 
26
+ HF_API = "https://huggingface.co/api"
 
 
 
 
 
 
 
 
27
 
28
 
29
  def _get_token(request: Request) -> str:
 
34
  return ""
35
 
36
 
37
+ def _get_credentials(token: str) -> dict:
38
+ """Get HF credentials from Worker. Returns {hf_token, repo, path_prefix}."""
39
+ with httpx.Client(timeout=15) as client:
40
+ resp = client.get(
41
+ f"{ADMIN_API_URL}/backup/credentials",
42
+ headers={"Authorization": f"Bearer {token}"},
43
+ )
44
+ if resp.status_code != 200:
45
+ data = resp.json() if "application/json" in resp.headers.get("content-type", "") else {"error": resp.text}
46
+ raise ValueError(data.get("error", f"Worker error: {resp.status_code}"))
47
+ return resp.json()
48
+
49
+
50
+ def _log_action(token: str, zone_name: str, action: str, status: str, file_path: str = ""):
51
+ """Log backup/restore action to Worker (best-effort)."""
52
+ try:
53
+ with httpx.Client(timeout=10) as client:
54
+ client.post(
55
+ f"{ADMIN_API_URL}/backup/log",
56
+ headers={"Authorization": f"Bearer {token}"},
57
+ json={"zone_name": zone_name, "action": action, "status": status, "file_path": file_path},
58
+ )
59
+ except Exception:
60
+ pass
61
+
62
+
63
  def _create_zone_archive(zone_name: str) -> Path:
64
  """Create a tar.gz archive of a zone directory."""
65
  zone_path = DATA_DIR / zone_name
 
95
  if not token:
96
  raise HTTPException(401, "Chua dang nhap")
97
  try:
98
+ creds = _get_credentials(token)
99
  async with httpx.AsyncClient(timeout=30) as client:
100
+ resp = await client.get(
101
+ f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}",
102
+ headers={"Authorization": f"Bearer {creds['hf_token']}"},
103
+ )
104
+ if resp.status_code == 404:
105
+ return []
106
  if resp.status_code != 200:
107
+ raise HTTPException(502, f"HF API error: {resp.status_code} {resp.text}")
108
+ tree = resp.json()
109
+ return [
110
+ {
111
+ "zone_name": f["path"].split("/")[-1].replace(".tar.gz", ""),
112
+ "file": f["path"],
113
+ "size": (f.get("lfs") or {}).get("size") or f.get("size", 0),
114
+ }
115
+ for f in tree
116
+ if f.get("type") == "file" and f["path"].endswith(".tar.gz")
117
+ ]
118
+ except ValueError as e:
119
+ raise HTTPException(502, str(e))
120
  except httpx.HTTPError as e:
121
+ raise HTTPException(502, f"Khong the ket noi: {e}")
122
+
123
+
124
+ def _upload_to_hf(creds: dict, zone_name: str, archive_path: Path):
125
+ """Upload archive directly to HuggingFace Dataset API."""
126
+ file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz"
127
+ hf_token = creds["hf_token"]
128
+ repo = creds["repo"]
129
+
130
+ with httpx.Client(timeout=300) as client:
131
+ with open(archive_path, "rb") as f:
132
+ resp = client.post(
133
+ f"{HF_API}/datasets/{repo}/upload/main/{file_path}",
134
+ headers={"Authorization": f"Bearer {hf_token}"},
135
+ content=f.read(),
136
+ )
137
+ if resp.status_code not in (200, 201):
138
+ raise ValueError(f"HF upload error: {resp.status_code} {resp.text}")
139
 
140
 
141
  @router.post("/zone/{zone_name}")
 
154
  if _backup_status["running"]:
155
  raise HTTPException(409, "Dang co backup khac dang chay")
156
 
157
+ # Fetch credentials before background task (validates JWT now)
158
+ try:
159
+ creds = _get_credentials(token)
160
+ except ValueError as e:
161
+ raise HTTPException(502, str(e))
162
+
163
  def _run():
164
  _backup_status["running"] = True
165
  _backup_status["error"] = None
 
167
  try:
168
  archive_path = _create_zone_archive(zone_name)
169
  try:
170
+ _upload_to_hf(creds, zone_name, archive_path)
 
 
 
 
 
 
 
 
171
  finally:
172
  archive_path.unlink(missing_ok=True)
173
+ _log_action(token, zone_name, "backup", "success",
174
+ f"{creds['path_prefix']}/{zone_name}.tar.gz")
175
  _backup_status["last"] = datetime.now().isoformat()
176
  _backup_status["progress"] = f"Backup zone {zone_name} thanh cong"
177
  except Exception as e:
178
  _backup_status["error"] = str(e)
179
  _backup_status["progress"] = f"Loi backup: {e}"
180
+ _log_action(token, zone_name, "backup", "error")
181
  finally:
182
  _backup_status["running"] = False
183
 
 
195
  if _backup_status["running"]:
196
  raise HTTPException(409, "Dang co backup khac dang chay")
197
 
198
+ try:
199
+ creds = _get_credentials(token)
200
+ except ValueError as e:
201
+ raise HTTPException(502, str(e))
202
+
203
  def _run():
204
  _backup_status["running"] = True
205
  _backup_status["error"] = None
 
215
  _backup_status["progress"] = f"Dang backup zone {zone_name} ({done + 1}/{total})..."
216
  archive_path = _create_zone_archive(zone_name)
217
  try:
218
+ _upload_to_hf(creds, zone_name, archive_path)
 
 
 
 
 
 
 
 
219
  finally:
220
  archive_path.unlink(missing_ok=True)
221
+ _log_action(token, zone_name, "backup", "success",
222
+ f"{creds['path_prefix']}/{zone_name}.tar.gz")
223
  done += 1
224
  _backup_status["last"] = datetime.now().isoformat()
225
  _backup_status["progress"] = "Backup tat ca zones thanh cong"
 
233
  return {"ok": True, "message": "Dang backup tat ca zones trong nen..."}
234
 
235
 
236
+ def _download_from_hf(creds: dict, zone_name: str) -> bytes:
237
+ """Download archive directly from HuggingFace Dataset."""
238
+ file_path = f"{creds['path_prefix']}/{zone_name}.tar.gz"
239
+ with httpx.Client(timeout=300) as client:
240
+ resp = client.get(
241
+ f"https://huggingface.co/datasets/{creds['repo']}/resolve/main/{file_path}",
242
+ headers={"Authorization": f"Bearer {creds['hf_token']}"},
243
+ follow_redirects=True,
244
+ )
245
+ if resp.status_code == 404:
246
+ raise FileNotFoundError(f"Backup zone '{zone_name}' khong ton tai")
247
+ if resp.status_code != 200:
248
+ raise ValueError(f"HF download error: {resp.status_code}")
249
+ return resp.content
250
+
251
+
252
  @router.post("/restore/{zone_name}")
253
  async def restore_zone(zone_name: str, request: Request, background_tasks: BackgroundTasks):
254
  if not ADMIN_API_URL:
 
263
  if _backup_status["running"]:
264
  raise HTTPException(409, "Dang co backup/restore khac dang chay")
265
 
266
+ try:
267
+ creds = _get_credentials(token)
268
+ except ValueError as e:
269
+ raise HTTPException(502, str(e))
270
+
271
  def _run():
272
  _backup_status["running"] = True
273
  _backup_status["error"] = None
274
  _backup_status["progress"] = f"Dang restore zone: {zone_name}..."
275
  try:
276
+ data = _download_from_hf(creds, zone_name)
277
+ archive_path = BACKUP_DIR / f"{zone_name}.tar.gz"
278
+ archive_path.write_bytes(data)
 
 
 
 
 
279
 
280
  try:
281
  zone_path = DATA_DIR / zone_name
 
296
  finally:
297
  archive_path.unlink(missing_ok=True)
298
 
299
+ _log_action(token, zone_name, "restore", "success",
300
+ f"{creds['path_prefix']}/{zone_name}.tar.gz")
301
  _backup_status["last"] = datetime.now().isoformat()
302
  _backup_status["progress"] = f"Restore zone {zone_name} thanh cong"
303
  except Exception as e:
304
  _backup_status["error"] = str(e)
305
  _backup_status["progress"] = f"Loi restore: {e}"
306
+ _log_action(token, zone_name, "restore", "error")
307
  finally:
308
  _backup_status["running"] = False
309
 
 
321
  if _backup_status["running"]:
322
  raise HTTPException(409, "Dang co backup/restore khac dang chay")
323
 
324
+ try:
325
+ creds = _get_credentials(token)
326
+ except ValueError as e:
327
+ raise HTTPException(502, str(e))
328
+
329
  def _run():
330
  _backup_status["running"] = True
331
  _backup_status["error"] = None
332
  _backup_status["progress"] = "Dang restore tat ca zones..."
333
  try:
334
+ # List backups from HF directly
335
  with httpx.Client(timeout=30) as client:
336
+ resp = client.get(
337
+ f"{HF_API}/datasets/{creds['repo']}/tree/main/{creds['path_prefix']}",
338
+ headers={"Authorization": f"Bearer {creds['hf_token']}"},
339
+ )
340
+ if resp.status_code == 404:
341
+ _backup_status["progress"] = "Khong co backup nao"
342
+ return
343
+ if resp.status_code != 200:
344
+ raise ValueError(f"HF API error: {resp.status_code}")
345
+
346
+ tree = resp.json()
347
+ backup_files = [
348
+ f for f in tree
349
+ if f.get("type") == "file" and f["path"].endswith(".tar.gz")
350
+ ]
351
+ total = len(backup_files)
352
  done = 0
353
+ for bf in backup_files:
354
+ zn = bf["path"].split("/")[-1].replace(".tar.gz", "")
355
+ _backup_status["progress"] = f"Dang restore zone {zn} ({done + 1}/{total})..."
356
+ try:
357
+ data = _download_from_hf(creds, zn)
358
+ except FileNotFoundError:
359
+ continue
360
+ archive_path = BACKUP_DIR / f"{zn}.tar.gz"
361
+ archive_path.write_bytes(data)
362
 
363
  try:
364
+ zone_path = DATA_DIR / zn
365
  if zone_path.exists():
366
  shutil.rmtree(zone_path)
367
  with tarfile.open(archive_path, "r:gz") as tar:
 
369
  member_path = os.path.normpath(member.name)
370
  if member_path.startswith("..") or os.path.isabs(member_path):
371
  raise ValueError(f"Archive chua path khong an toan: {member.name}")
372
+ if not member_path.startswith(zn):
373
  raise ValueError(f"Archive chua path ngoai zone: {member.name}")
374
  tar.extractall(path=str(DATA_DIR), filter="data")
375
  meta = load_meta()
376
+ if zn not in meta:
377
+ meta[zn] = {"description": "", "created": datetime.now().isoformat()}
378
  save_meta(meta)
379
  finally:
380
  archive_path.unlink(missing_ok=True)