File size: 2,200 Bytes
d93804e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env bash
set -uo pipefail

if [[ $# -lt 4 ]]; then
  echo "usage: $0 <repo_id> <checkpoint_root> <status_log> <state_file> [interval_s]" >&2
  exit 1
fi

REPO_ID="$1"
CHECKPOINT_ROOT="$2"
STATUS_LOG="$3"
STATE_FILE="$4"
INTERVAL_S="${5:-120}"

mkdir -p "$(dirname "$STATUS_LOG")"
mkdir -p "$(dirname "$STATE_FILE")"
touch "$STATE_FILE"

log() {
  printf '%s | %s\n' "$(date -u +%FT%TZ)" "$*" >> "$STATUS_LOG"
}

already_done() {
  grep -qx "$1" "$STATE_FILE" 2>/dev/null
}

mark_done() {
  printf '%s\n' "$1" >> "$STATE_FILE"
}

checkpoint_ready() {
  local step="$1"
  local src="${CHECKPOINT_ROOT}/${step}"
  [[ -f "${src}/_CHECKPOINT_METADATA" && -f "${src}/params/_METADATA" ]]
}

verify_remote_step() {
  local step="$1"
  local verify_dir="/workspace/hf_verify_${step}"
  rm -rf "$verify_dir"
  mkdir -p "$verify_dir"
  hf download "$REPO_ID" "${step}/_CHECKPOINT_METADATA" --type model --local-dir "$verify_dir" >/dev/null
  hf download "$REPO_ID" "${step}/params/_METADATA" --type model --local-dir "$verify_dir" >/dev/null
}

archive_step() {
  local step="$1"
  local src="${CHECKPOINT_ROOT}/${step}"
  local stage="/workspace/hf_stage_${step}"

  if [[ ! -d "$src" ]]; then
    log "skip step=${step} missing_local_dir"
    return 0
  fi

  rm -rf "$stage"
  mkdir -p "$stage"
  cp -al "$src" "$stage/"
  log "upload_start step=${step}"
  hf upload-large-folder "$REPO_ID" "$stage" --type model --num-workers 8 --no-bars >> "$STATUS_LOG" 2>&1
  verify_remote_step "$step"
  rm -rf "$src" "$stage"
  mark_done "$step"
  log "upload_done step=${step}"
}

log "archiver_start repo=${REPO_ID} root=${CHECKPOINT_ROOT}"

while true; do
  if [[ ! -d "$CHECKPOINT_ROOT" ]]; then
    log "checkpoint_root_missing"
    sleep "$INTERVAL_S"
    continue
  fi

  while IFS= read -r step; do
    [[ -z "$step" ]] && continue
    if already_done "$step"; then
      continue
    fi
    if ! checkpoint_ready "$step"; then
      log "skip step=${step} checkpoint_not_ready"
      continue
    fi
    archive_step "$step" || log "upload_failed step=${step}"
  done < <(find "$CHECKPOINT_ROOT" -maxdepth 1 -mindepth 1 -type d -printf '%f\n' | rg '^[0-9]+$' | sort -n)

  sleep "$INTERVAL_S"
done