OpenTransformer/agillm42-checkpoints / code /federation /agillm41_bucket_sync_loop.sh
OpenTransformer's picture
download
raw
6.32 kB
#!/usr/bin/env bash
# Additive HF Xet-bucket backup of AGILLM-4.1: keeps the checkpoint dir AND a clean
# snapshot of the 4.1 runtime code together in one self-contained bucket
# (OpenTransformer/agillm42-checkpoints), so a checkpoint always travels with the
# exact code that loads it. Separate from the legacy OpenTransformer/AGILLM-4 model
# repo (old code + possibly-incompatible checkpoints). Runs ALONGSIDE the existing
# AGILLM-4 uploader (which stays as fallback). Xet dedup => only changed chunks ship
# (measured: 13GB ckpts -> 74MB first sync, ~0.4s when unchanged).
# Needs an isolated venv with huggingface_hub>=1.x (trainer's 0.36.2 env untouched):
# python3 -m venv /workspace/hfbucket_venv
# /workspace/hfbucket_venv/bin/pip install "huggingface_hub[hf_xet]==1.18.0"
# Launch: tmux new-session -d -s bucket_sync /workspace/agillm41_bucket_sync_loop.sh
CKPT_DIR="${AGILLM41_SAVE_DIR:-/workspace/agillm4_4090_ckpts_active}"
MAINLINE="${AGILLM41_MAINLINE:-/workspace/agillm41-mainline}"
STAGE="${AGILLM41_BUCKET_CODE_STAGE:-/workspace/agillm41_bucket_code/code}"
CKPT_STAGE="${AGILLM41_BUCKET_CKPT_STAGE:-/workspace/agillm41_bucket_ckpt_stage}"
BUCKET="${AGILLM41_CKPT_BUCKET:-OpenTransformer/agillm42-checkpoints}"
INTERVAL="${AGILLM41_BUCKET_SYNC_SEC:-600}"
VENV="${AGILLM41_HF_BUCKET_VENV:-/workspace/hfbucket_venv}"
LOG="${AGILLM41_BUCKET_SYNC_LOG:-/workspace/agillm41_bucket_sync.log}"
export HF_TOKEN="$(tr -d '\r\n' < /root/.cache/huggingface/token 2>/dev/null)"
export HUGGING_FACE_HUB_TOKEN="$HF_TOKEN"
# isolated HF/xet cache so we never race the AGILLM-4 uploader's cache cleanup
export HF_HOME="${AGILLM41_BUCKET_HF_HOME:-/workspace/.hf_bucket_home}"
export HF_XET_CACHE="$HF_HOME/xet"
mkdir -p "$HF_XET_CACHE"
exec >> "$LOG" 2>&1
echo "BUCKET_SYNC_LOOP_START $(date -u +%Y-%m-%dT%H:%M:%SZ) ckpts=$CKPT_DIR bucket=$BUCKET interval=${INTERVAL}s"
stage_code() {
mkdir -p "$STAGE/agillm4" "$STAGE/distributed_infer" "$STAGE/public_join" "$STAGE/federation" "$STAGE/inference" "$STAGE/inference/latest_run"
cp -f "$MAINLINE/agillm41.py" "$STAGE/" 2>/dev/null || true
cp -f "$MAINLINE/agillm43_diffusionblocks_independent.py" "$STAGE/" 2>/dev/null || true
cp -f "$MAINLINE/DIFFUSIONBLOCKS_INDEPENDENT.md" "$STAGE/" 2>/dev/null || true
cp -rf "$MAINLINE/agillm4/training_bench" "$MAINLINE/agillm4/ops" "$STAGE/agillm4/" 2>/dev/null || true
cp -f "$MAINLINE"/distributed_infer/agillm41_*.py "$STAGE/distributed_infer/" 2>/dev/null || true
cp -f "$MAINLINE"/public_join/agillm41_*.py "$STAGE/public_join/" 2>/dev/null || true
cp -f /workspace/agillm43_hf_status/* "$STAGE/inference/" 2>/dev/null || true
if [ -f /workspace/agillm43_hf_status/latest_inference.pointer ]; then
latest_infer_dir="$(sed -n 's/^latest_inference_dir=//p' /workspace/agillm43_hf_status/latest_inference.pointer | tail -1)"
if [ -n "$latest_infer_dir" ] && [ -d "$latest_infer_dir" ]; then
cp -f "$latest_infer_dir"/* "$STAGE/inference/latest_run/" 2>/dev/null || true
fi
fi
for f in \
/workspace/agillm41_batch_planner.py \
/workspace/agillm41_vast_side_cycle.sh \
/workspace/side_cycle_watchdog.sh \
/workspace/run_side_cycle.sh \
/workspace/agillm41_hf_mirror.py \
/workspace/agillm41_vast_side_update_puller.sh \
/workspace/agillm41_lease_plan.py \
/workspace/agillm41_lease_decide.py \
/workspace/agillm41_bucket_sync_loop.sh; do
[ -f "$f" ] && cp -f "$f" "$STAGE/federation/"
done
find "$STAGE" -name '__pycache__' -type d -exec rm -rf {} + 2>/dev/null || true
find "$STAGE" -type f \( -name '*.bak_*' -o -name '*.bak2_*' -o -name '*.bak3_*' -o -name '*.bak_fed_stage_*' \) -delete 2>/dev/null || true
}
stage_ckpt() {
rm -rf "$CKPT_STAGE"
mkdir -p "$CKPT_STAGE"
cp -f "$CKPT_DIR/latest.json" "$CKPT_STAGE/latest.json" 2>/dev/null || true
cp -f "$CKPT_DIR/run_state.json" "$CKPT_STAGE/run_state.json" 2>/dev/null || true
latest="$(python3 - "$CKPT_DIR/latest.json" <<'PYCKPT' 2>/dev/null
import json, pathlib, sys
try:
data=json.load(open(sys.argv[1]))
print(pathlib.Path(data.get('path','')).name)
except Exception:
pass
PYCKPT
)"
if [ -n "$latest" ] && [ -f "$CKPT_DIR/$latest" ]; then
cp -al "$CKPT_DIR/$latest" "$CKPT_STAGE/$latest" 2>/dev/null || cp -f "$CKPT_DIR/$latest" "$CKPT_STAGE/$latest"
if [ ! -f "$CKPT_DIR/$latest.upload.sha256" ]; then
echo "BUCKET_SYNC_MARKER_HASH_START $(date -u +%Y-%m-%dT%H:%M:%SZ) $latest"
sha256sum "$CKPT_DIR/$latest" > "$CKPT_DIR/$latest.upload.sha256.tmp"
mv "$CKPT_DIR/$latest.upload.sha256.tmp" "$CKPT_DIR/$latest.upload.sha256"
echo "BUCKET_SYNC_MARKER_HASH_DONE $(date -u +%Y-%m-%dT%H:%M:%SZ) $latest"
fi
[ -f "$CKPT_DIR/$latest.upload.sha256" ] && cp -f "$CKPT_DIR/$latest.upload.sha256" "$CKPT_STAGE/$latest.upload.sha256"
[ -f "$CKPT_DIR/$latest.sha256" ] && cp -f "$CKPT_DIR/$latest.sha256" "$CKPT_STAGE/$latest.sha256"
[ -f "$CKPT_DIR/$latest.tokenizer.json" ] && cp -f "$CKPT_DIR/$latest.tokenizer.json" "$CKPT_STAGE/$latest.tokenizer.json"
fi
for f in golden_verified.pt golden_verified.pt.json; do
if [ -f "$CKPT_DIR/$f" ]; then
cp -al "$CKPT_DIR/$f" "$CKPT_STAGE/$f" 2>/dev/null || cp -f "$CKPT_DIR/$f" "$CKPT_STAGE/$f"
fi
done
}
while true; do
echo "BUCKET_SYNC_TICK $(date -u +%Y-%m-%dT%H:%M:%SZ)"
export HF_TOKEN="$(tr -d '\r\n' < /root/.cache/huggingface/token 2>/dev/null)"
export HUGGING_FACE_HUB_TOKEN="$HF_TOKEN"
stage_code
stage_ckpt
"$VENV/bin/python" - "$CKPT_STAGE" "$STAGE" "$BUCKET" <<'PY' || echo "BUCKET_SYNC_ERR $(date -u +%H:%M:%S)"
import sys, time
from huggingface_hub import sync_bucket
ckpt, stage, bucket = sys.argv[1], sys.argv[2], sys.argv[3]
t=time.time()
rc=sync_bucket(ckpt, f"hf://buckets/{bucket}/ckpts", delete=False, exclude=["*.partial.*", "*.tmp", "*.uploading"])
rk=sync_bucket(stage, f"hf://buckets/{bucket}/code", delete=False)
import os, json
try: _st=str(json.load(open(os.path.join(ckpt,"latest.json"))).get("step",""))
except Exception: _st=""
print(f"BUCKET_SYNC_OK wall={time.time()-t:.1f}s ckpt_ops={len(getattr(rc,'operations',[]) or [])} code_ops={len(getattr(rk,'operations',[]) or [])} ckpt_step={_st}")
PY
rm -rf "$HF_XET_CACHE"/*/shard-cache/* "$HF_XET_CACHE"/*/staging/* 2>/dev/null || true
sleep "$INTERVAL"
done

Xet Storage Details

Size:
6.32 kB
·
Xet hash:
ec7f3d6dde827fa98b641c1bee9fbaeca84c7c707b9218e0d19fd21a79274738

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.