# webdav_sync.py # -*- coding: utf-8 -*- """ WebDAV 同步器:启动时从远端拉取 history.json,随后每 30 分钟将本地变动回传到远端。 - 仅依赖 requests - 支持 Basic Auth - 避免多进程/多实例重复同步:使用文件锁保证只有一个同步线程在运行 环境变量(至少配置 WEBDAV_URL): WEBDAV_URL : 远端文件的完整 URL(例如 https://dav.example.com/path/history.json) WEBDAV_USERNAME : (可选)用户名 WEBDAV_PASSWORD : (可选)密码 WEBDAV_VERIFY_SSL : (可选)默认为 true;设为 "false" 可跳过证书验证 WEBDAV_SYNC_INTERVAL : (可选)同步间隔秒,默认 1800(30 分钟) WEBDAV_SYNC_ENABLED : (可选)默认启用;设为 "false" 可关闭 """ import os import time import json import hashlib import threading from contextlib import contextmanager from typing import Optional import requests DEFAULT_INTERVAL = 1800 # 30 minutes LOCKFILE_PATH = "/tmp/webdav_sync.lock" # 进程间锁,确保只跑一个同步线程 def _bool_env(name: str, default: bool) -> bool: v = os.environ.get(name) if v is None: return default return str(v).strip().lower() in ("1", "true", "yes", "on") def _int_env(name: str, default: int) -> int: v = os.environ.get(name) try: return int(v) if v is not None else default except ValueError: return default def _sha1_of_file(path: str) -> Optional[str]: if not os.path.isfile(path): return None h = hashlib.sha1() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest() @contextmanager def _interprocess_lock(path: str): """简易文件锁(独占创建)。拿不到锁就不抛异常,直接 yield False。""" fd = None acquired = False try: # O_EXCL + O_CREAT:若文件已存在会失败 fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600) os.write(fd, str(os.getpid()).encode("utf-8")) acquired = True yield True except FileExistsError: yield False finally: if fd is not None: try: os.close(fd) os.unlink(path) except Exception: pass class WebDavSyncer: def __init__(self, local_path: str): self.local_path = local_path self.url = os.environ.get("WEBDAV_URL", "").strip() self.username = os.environ.get("WEBDAV_USERNAME", "") self.password = os.environ.get("WEBDAV_PASSWORD", "") self.verify_ssl = _bool_env("WEBDAV_VERIFY_SSL", True) self.interval = _int_env("WEBDAV_SYNC_INTERVAL", DEFAULT_INTERVAL) self.enabled = _bool_env("WEBDAV_SYNC_ENABLED", True) and bool(self.url) self._lock = threading.Lock() self._stop = threading.Event() self._thread = None self._last_pushed_hash = None def _session(self) -> requests.Session: s = requests.Session() if self.username: s.auth = (self.username, self.password) s.verify = self.verify_ssl s.headers.update({"User-Agent": "history-sync/1.0"}) return s # ====== Pull(启动时执行一次)====== def initial_pull(self): if not self.enabled: print("[webdav-sync] Disabled or missing WEBDAV_URL; skip initial pull.") return try: with self._session() as s: # 先 HEAD 看看是否存在 h = s.head(self.url, timeout=10) if h.status_code == 404: print(f"[webdav-sync] Remote not found (404): {self.url}. Skip pull.") return if h.status_code >= 400: print(f"[webdav-sync] HEAD failed: {h.status_code} {h.text[:200]}") return r = s.get(self.url, timeout=30) if r.status_code == 404: print(f"[webdav-sync] Remote not found on GET: {self.url}.") return r.raise_for_status() content = r.content # 简单校验是否是 JSON try: json.loads(content.decode("utf-8")) except Exception: print("[webdav-sync] Warning: remote content is not valid JSON; still writing as-is.") # 写入本地 os.makedirs(os.path.dirname(self.local_path) or ".", exist_ok=True) with open(self.local_path, "wb") as f: f.write(content) self._last_pushed_hash = _sha1_of_file(self.local_path) print(f"[webdav-sync] Pulled remote -> {self.local_path}") except requests.RequestException as e: print(f"[webdav-sync] Initial pull failed: {e}") # ====== Push(周期性,把本地变更上传)====== def push_if_changed(self): if not self.enabled: return current_hash = _sha1_of_file(self.local_path) if not current_hash: # 本地没有文件就不 push return if current_hash == self._last_pushed_hash: return try: with self._session() as s, open(self.local_path, "rb") as f: r = s.put(self.url, data=f, timeout=30) if r.status_code >= 400: print(f"[webdav-sync] PUT failed: {r.status_code} {r.text[:200]}") return self._last_pushed_hash = current_hash print(f"[webdav-sync] Pushed local -> {self.url}") except requests.RequestException as e: print(f"[webdav-sync] Push failed: {e}") def _loop(self): # 周期循环 while not self._stop.is_set(): try: with self._lock: self.push_if_changed() except Exception as e: print(f"[webdav-sync] Loop error: {e}") self._stop.wait(self.interval) def start(self): if not self.enabled: print("[webdav-sync] disabled; not starting background sync.") return # 进程间锁:只有一个进程会启动线程 with _interprocess_lock(LOCKFILE_PATH) as ok: if not ok: print("[webdav-sync] another process is already syncing; skip starting thread.") return # 记录当前 baseline self._last_pushed_hash = _sha1_of_file(self.local_path) self._thread = threading.Thread(target=self._loop, name="WebDavSyncer", daemon=True) self._thread.start() # 注意:离开 with 后锁文件会被释放——这是有意的; # 目的只是避免“同时多个进程创建线程”。创建完成后就算释放,也只会有一个线程已在运行。 def stop(self): self._stop.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=3) # ====== 供应用调用的便捷入口 ====== def start_webdav_sync_if_configured(local_history_path: str): """ 在应用启动时调用: 1) 先尝试拉取远端文件覆盖本地 2) 启动后台线程每隔 interval 推送本地变更 """ syncer = WebDavSyncer(local_history_path) # 先拉取一份,保证“新部署前从远端下载到运行目录”的要求 syncer.initial_pull() # 再启动定时 push syncer.start() return syncer # 如需在应用关闭时手动 stop,可保留引用