tao-shen Claude Opus 4.6 commited on
Commit
5c0994f
Β·
1 Parent(s): b97023a

fix: replace hanging upload_large_folder with per-dir upload + rate limiting

Browse files

upload_large_folder hangs after ~50 commits (known issue). Reverted to
per-directory upload_folder approach with:
- Split dirs >3000 files into sub-dirs recursively
- 3s delay between uploads (rate limiting)
- Retry with backoff (3 attempts, 5s/10s/15s)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Files changed (1) hide show
  1. entrypoint.py +102 -19
entrypoint.py CHANGED
@@ -259,27 +259,110 @@ def save_and_upload():
259
  shutil.rmtree(full, ignore_errors=True)
260
  dirnames.remove(d)
261
 
262
- # Upload entire /data/ to HF dataset using upload_large_folder.
263
- # Handles chunking, dedup, retries, and multi-commit internally.
264
  from huggingface_hub import HfApi
265
  api = HfApi(token=HF_TOKEN)
266
- t0_up = time.time()
267
- try:
268
- rc, count = run(f"find {PERSIST_PATH} -type f -not -path '*/.cache/*' -not -path '*/.git/*' | wc -l")
269
- log(f" uploading {count.strip()} files via upload_large_folder ...")
270
- api.upload_large_folder(
271
- folder_path=PERSIST_PATH,
272
- repo_id=HF_DATASET_REPO,
273
- repo_type="dataset",
274
- ignore_patterns=UPLOAD_IGNORE,
275
- print_report=False,
276
- )
277
- elapsed_up = time.time() - t0_up
278
- log(f"══ SYNC: done ({elapsed_up:.1f}s) ══")
279
- except Exception as e:
280
- elapsed_up = time.time() - t0_up
281
- err_short = str(e).split('\n')[0][:120]
282
- log(f"══ SYNC: upload failed ({elapsed_up:.1f}s): {err_short} ══")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
283
 
284
 
285
  # Event to signal restore completion (sync must wait)
 
259
  shutil.rmtree(full, ignore_errors=True)
260
  dirnames.remove(d)
261
 
262
+ # Upload per-directory with retry + rate limiting
 
263
  from huggingface_hub import HfApi
264
  api = HfApi(token=HF_TOKEN)
265
+ ts = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
266
+ t0_all = time.time()
267
+ ok_count = 0
268
+ fail_count = 0
269
+
270
+ def count_files(path):
271
+ try:
272
+ return int(subprocess.check_output(
273
+ f"find '{path}' -type f | wc -l", shell=True, text=True).strip())
274
+ except Exception:
275
+ return 0
276
+
277
+ def upload_one(local_path, repo_path, max_retries=3):
278
+ """Upload a single directory with retry + backoff."""
279
+ nonlocal ok_count, fail_count
280
+ fc = count_files(local_path)
281
+ if fc == 0:
282
+ return
283
+ for attempt in range(max_retries):
284
+ t0 = time.time()
285
+ try:
286
+ api.upload_folder(
287
+ folder_path=local_path,
288
+ repo_id=HF_DATASET_REPO,
289
+ repo_type="dataset",
290
+ path_in_repo=repo_path,
291
+ commit_message=f"sync {ts}: {repo_path}/",
292
+ ignore_patterns=UPLOAD_IGNORE,
293
+ )
294
+ log(f" {repo_path}/ ok ({time.time()-t0:.1f}s, {fc} files)")
295
+ ok_count += 1
296
+ return
297
+ except Exception as e:
298
+ err = str(e).split('\n')[0][:100]
299
+ if attempt < max_retries - 1:
300
+ wait = (attempt + 1) * 5
301
+ log(f" {repo_path}/ retry {attempt+1} ({time.time()-t0:.1f}s): {err}")
302
+ time.sleep(wait)
303
+ else:
304
+ log(f" {repo_path}/ FAILED ({time.time()-t0:.1f}s): {err}")
305
+ fail_count += 1
306
+
307
+ def upload_dir(local_path, repo_path):
308
+ """Upload directory, splitting into sub-dirs if >3000 files."""
309
+ fc = count_files(local_path)
310
+ if fc == 0:
311
+ return
312
+ if fc <= 3000:
313
+ upload_one(local_path, repo_path)
314
+ time.sleep(3) # rate limit
315
+ else:
316
+ log(f" {repo_path}/ ({fc} files) β†’ splitting into sub-dirs")
317
+ # Upload loose files at this level
318
+ loose = [f for f in os.listdir(local_path)
319
+ if os.path.isfile(os.path.join(local_path, f))
320
+ and not f.startswith('.')]
321
+ if loose:
322
+ for f in loose:
323
+ try:
324
+ api.upload_file(
325
+ path_or_fileobj=os.path.join(local_path, f),
326
+ path_in_repo=f"{repo_path}/{f}",
327
+ repo_id=HF_DATASET_REPO,
328
+ repo_type="dataset",
329
+ commit_message=f"sync {ts}: {repo_path}/{f}",
330
+ )
331
+ except Exception:
332
+ pass
333
+ # Recurse into sub-dirs
334
+ for d in sorted(os.listdir(local_path)):
335
+ dp = os.path.join(local_path, d)
336
+ if os.path.isdir(dp) and not d.startswith('.'):
337
+ rp = f"{repo_path}/{d}"
338
+ upload_dir(dp, rp)
339
+
340
+ # Upload root-level files
341
+ root_files = [f for f in os.listdir(PERSIST_PATH)
342
+ if os.path.isfile(os.path.join(PERSIST_PATH, f))
343
+ and not f.startswith('.')]
344
+ for rf in root_files:
345
+ try:
346
+ api.upload_file(
347
+ path_or_fileobj=os.path.join(PERSIST_PATH, rf),
348
+ path_in_repo=rf,
349
+ repo_id=HF_DATASET_REPO,
350
+ repo_type="dataset",
351
+ commit_message=f"sync {ts}: {rf}",
352
+ )
353
+ except Exception as e:
354
+ log(f" {rf} failed: {str(e)[:80]}")
355
+ if root_files:
356
+ log(f" uploaded {len(root_files)} root files")
357
+
358
+ # Upload each top-level directory
359
+ for d in sorted(os.listdir(PERSIST_PATH)):
360
+ dp = os.path.join(PERSIST_PATH, d)
361
+ if os.path.isdir(dp) and not d.startswith('.'):
362
+ upload_dir(dp, d)
363
+
364
+ elapsed_all = time.time() - t0_all
365
+ log(f"══ SYNC: done ({elapsed_all:.1f}s) β€” {ok_count} ok, {fail_count} failed ══")
366
 
367
 
368
  # Event to signal restore completion (sync must wait)