Spaces:
Paused
Paused
File size: 7,657 Bytes
9ecdee6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# webdav_sync.py
# -*- coding: utf-8 -*-
"""
WebDAV 同步器:启动时从远端拉取 history.json,随后每 30 分钟将本地变动回传到远端。
- 仅依赖 requests
- 支持 Basic Auth
- 避免多进程/多实例重复同步:使用文件锁保证只有一个同步线程在运行
环境变量(至少配置 WEBDAV_URL):
WEBDAV_URL : 远端文件的完整 URL(例如 https://dav.example.com/path/history.json)
WEBDAV_USERNAME : (可选)用户名
WEBDAV_PASSWORD : (可选)密码
WEBDAV_VERIFY_SSL : (可选)默认为 true;设为 "false" 可跳过证书验证
WEBDAV_SYNC_INTERVAL : (可选)同步间隔秒,默认 1800(30 分钟)
WEBDAV_SYNC_ENABLED : (可选)默认启用;设为 "false" 可关闭
"""
import os
import time
import json
import hashlib
import threading
from contextlib import contextmanager
from typing import Optional
import requests
DEFAULT_INTERVAL = 1800 # 30 minutes
LOCKFILE_PATH = "/tmp/webdav_sync.lock" # 进程间锁,确保只跑一个同步线程
def _bool_env(name: str, default: bool) -> bool:
v = os.environ.get(name)
if v is None:
return default
return str(v).strip().lower() in ("1", "true", "yes", "on")
def _int_env(name: str, default: int) -> int:
v = os.environ.get(name)
try:
return int(v) if v is not None else default
except ValueError:
return default
def _sha1_of_file(path: str) -> Optional[str]:
if not os.path.isfile(path):
return None
h = hashlib.sha1()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
@contextmanager
def _interprocess_lock(path: str):
"""简易文件锁(独占创建)。拿不到锁就不抛异常,直接 yield False。"""
fd = None
acquired = False
try:
# O_EXCL + O_CREAT:若文件已存在会失败
fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
os.write(fd, str(os.getpid()).encode("utf-8"))
acquired = True
yield True
except FileExistsError:
yield False
finally:
if fd is not None:
try:
os.close(fd)
os.unlink(path)
except Exception:
pass
class WebDavSyncer:
def __init__(self, local_path: str):
self.local_path = local_path
self.url = os.environ.get("WEBDAV_URL", "").strip()
self.username = os.environ.get("WEBDAV_USERNAME", "")
self.password = os.environ.get("WEBDAV_PASSWORD", "")
self.verify_ssl = _bool_env("WEBDAV_VERIFY_SSL", True)
self.interval = _int_env("WEBDAV_SYNC_INTERVAL", DEFAULT_INTERVAL)
self.enabled = _bool_env("WEBDAV_SYNC_ENABLED", True) and bool(self.url)
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread = None
self._last_pushed_hash = None
def _session(self) -> requests.Session:
s = requests.Session()
if self.username:
s.auth = (self.username, self.password)
s.verify = self.verify_ssl
s.headers.update({"User-Agent": "history-sync/1.0"})
return s
# ====== Pull(启动时执行一次)======
def initial_pull(self):
if not self.enabled:
print("[webdav-sync] Disabled or missing WEBDAV_URL; skip initial pull.")
return
try:
with self._session() as s:
# 先 HEAD 看看是否存在
h = s.head(self.url, timeout=10)
if h.status_code == 404:
print(f"[webdav-sync] Remote not found (404): {self.url}. Skip pull.")
return
if h.status_code >= 400:
print(f"[webdav-sync] HEAD failed: {h.status_code} {h.text[:200]}")
return
r = s.get(self.url, timeout=30)
if r.status_code == 404:
print(f"[webdav-sync] Remote not found on GET: {self.url}.")
return
r.raise_for_status()
content = r.content
# 简单校验是否是 JSON
try:
json.loads(content.decode("utf-8"))
except Exception:
print("[webdav-sync] Warning: remote content is not valid JSON; still writing as-is.")
# 写入本地
os.makedirs(os.path.dirname(self.local_path) or ".", exist_ok=True)
with open(self.local_path, "wb") as f:
f.write(content)
self._last_pushed_hash = _sha1_of_file(self.local_path)
print(f"[webdav-sync] Pulled remote -> {self.local_path}")
except requests.RequestException as e:
print(f"[webdav-sync] Initial pull failed: {e}")
# ====== Push(周期性,把本地变更上传)======
def push_if_changed(self):
if not self.enabled:
return
current_hash = _sha1_of_file(self.local_path)
if not current_hash:
# 本地没有文件就不 push
return
if current_hash == self._last_pushed_hash:
return
try:
with self._session() as s, open(self.local_path, "rb") as f:
r = s.put(self.url, data=f, timeout=30)
if r.status_code >= 400:
print(f"[webdav-sync] PUT failed: {r.status_code} {r.text[:200]}")
return
self._last_pushed_hash = current_hash
print(f"[webdav-sync] Pushed local -> {self.url}")
except requests.RequestException as e:
print(f"[webdav-sync] Push failed: {e}")
def _loop(self):
# 周期循环
while not self._stop.is_set():
try:
with self._lock:
self.push_if_changed()
except Exception as e:
print(f"[webdav-sync] Loop error: {e}")
self._stop.wait(self.interval)
def start(self):
if not self.enabled:
print("[webdav-sync] disabled; not starting background sync.")
return
# 进程间锁:只有一个进程会启动线程
with _interprocess_lock(LOCKFILE_PATH) as ok:
if not ok:
print("[webdav-sync] another process is already syncing; skip starting thread.")
return
# 记录当前 baseline
self._last_pushed_hash = _sha1_of_file(self.local_path)
self._thread = threading.Thread(target=self._loop, name="WebDavSyncer", daemon=True)
self._thread.start()
# 注意:离开 with 后锁文件会被释放——这是有意的;
# 目的只是避免“同时多个进程创建线程”。创建完成后就算释放,也只会有一个线程已在运行。
def stop(self):
self._stop.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=3)
# ====== 供应用调用的便捷入口 ======
def start_webdav_sync_if_configured(local_history_path: str):
"""
在应用启动时调用:
1) 先尝试拉取远端文件覆盖本地
2) 启动后台线程每隔 interval 推送本地变更
"""
syncer = WebDavSyncer(local_history_path)
# 先拉取一份,保证“新部署前从远端下载到运行目录”的要求
syncer.initial_pull()
# 再启动定时 push
syncer.start()
return syncer # 如需在应用关闭时手动 stop,可保留引用 |