Spaces:
Running
Running
File size: 4,071 Bytes
f120063 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import os
import sys
import time
import threading
from pathlib import Path
from huggingface_hub import sync_bucket
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
BUCKET_REPO = os.environ.get("HF_BUCKET_REPO", "").strip()
BUCKET_TOKEN = (
os.environ.get("HF_BUCKET_TOKEN", "").strip()
or os.environ.get("HF_TOKEN", "").strip()
)
LOCAL_ROOT = Path(os.environ.get("HF_BUCKET_LOCAL_DIR", "/data/qwen2api")).resolve()
REMOTE_ROOT = os.environ.get("HF_BUCKET_REMOTE_DIR", "runtime").strip("/")
SYNC_INTERVAL = int(os.environ.get("HF_BUCKET_SYNC_INTERVAL", "300"))
STARTUP_GRACE_SECONDS = int(os.environ.get("HF_BUCKET_STARTUP_GRACE_SECONDS", "30"))
SYNC_DEBOUNCE_SECONDS = int(os.environ.get("HF_BUCKET_SYNC_DEBOUNCE_SECONDS", "5"))
def log(message: str) -> None:
print(f"[hf-bucket] {message}", flush=True)
def bucket_path() -> str:
if REMOTE_ROOT:
return f"hf://buckets/{BUCKET_REPO}/{REMOTE_ROOT}"
return f"hf://buckets/{BUCKET_REPO}"
def ensure_local_root() -> None:
LOCAL_ROOT.mkdir(parents=True, exist_ok=True)
def can_use_bucket() -> bool:
if not BUCKET_REPO:
log("skip: HF_BUCKET_REPO 未设置")
return False
if not BUCKET_TOKEN:
log("skip: HF_TOKEN 或 HF_BUCKET_TOKEN 未设置")
return False
return True
def restore() -> None:
if not can_use_bucket():
return
ensure_local_root()
try:
sync_bucket(bucket_path(), str(LOCAL_ROOT), token=BUCKET_TOKEN)
log(f"restore 完成: {bucket_path()} -> {LOCAL_ROOT}")
except Exception as exc:
log(f"restore 失败: {exc}")
def push() -> None:
if not can_use_bucket():
return
ensure_local_root()
try:
sync_bucket(str(LOCAL_ROOT), bucket_path(), token=BUCKET_TOKEN, delete=False)
log(f"sync 完成: {LOCAL_ROOT} -> {bucket_path()}")
except Exception as exc:
log(f"sync 失败: {exc}")
def daemon() -> None:
if not can_use_bucket():
return
class ChangeHandler(FileSystemEventHandler):
def __init__(self) -> None:
self._timer = None
self._lock = threading.Lock()
self._startup_time = time.time()
def _schedule(self) -> None:
if time.time() - self._startup_time < STARTUP_GRACE_SECONDS:
return
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(SYNC_DEBOUNCE_SECONDS, push)
self._timer.daemon = True
self._timer.start()
def on_created(self, event):
if not event.is_directory:
self._schedule()
def on_modified(self, event):
if not event.is_directory:
self._schedule()
def on_deleted(self, event):
if not event.is_directory:
self._schedule()
def on_moved(self, event):
if not event.is_directory:
self._schedule()
observer = Observer()
observer.schedule(ChangeHandler(), str(LOCAL_ROOT), recursive=True)
observer.start()
log(
f"后台同步已启动,监听目录 {LOCAL_ROOT},即时同步防抖 {SYNC_DEBOUNCE_SECONDS} 秒,定时兜底 {SYNC_INTERVAL} 秒"
)
def periodic_sync() -> None:
while True:
time.sleep(SYNC_INTERVAL)
push()
periodic_thread = threading.Thread(target=periodic_sync, daemon=True)
periodic_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def main() -> int:
command = sys.argv[1] if len(sys.argv) > 1 else "daemon"
if command == "restore":
restore()
return 0
if command == "push":
push()
return 0
if command == "daemon":
daemon()
return 0
log(f"未知命令: {command}")
return 1
if __name__ == "__main__":
raise SystemExit(main())
|