File size: 7,657 Bytes
9ecdee6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# webdav_sync.py
# -*- coding: utf-8 -*-
"""
WebDAV 同步器:启动时从远端拉取 history.json,随后每 30 分钟将本地变动回传到远端。
- 仅依赖 requests
- 支持 Basic Auth
- 避免多进程/多实例重复同步:使用文件锁保证只有一个同步线程在运行
环境变量(至少配置 WEBDAV_URL):
  WEBDAV_URL           : 远端文件的完整 URL(例如 https://dav.example.com/path/history.json)
  WEBDAV_USERNAME      : (可选)用户名
  WEBDAV_PASSWORD      : (可选)密码
  WEBDAV_VERIFY_SSL    : (可选)默认为 true;设为 "false" 可跳过证书验证
  WEBDAV_SYNC_INTERVAL : (可选)同步间隔秒,默认 1800(30 分钟)
  WEBDAV_SYNC_ENABLED  : (可选)默认启用;设为 "false" 可关闭
"""

import os
import time
import json
import hashlib
import threading
from contextlib import contextmanager
from typing import Optional

import requests

DEFAULT_INTERVAL = 1800  # 30 minutes
LOCKFILE_PATH = "/tmp/webdav_sync.lock"  # 进程间锁,确保只跑一个同步线程

def _bool_env(name: str, default: bool) -> bool:
    v = os.environ.get(name)
    if v is None:
        return default
    return str(v).strip().lower() in ("1", "true", "yes", "on")

def _int_env(name: str, default: int) -> int:
    v = os.environ.get(name)
    try:
        return int(v) if v is not None else default
    except ValueError:
        return default

def _sha1_of_file(path: str) -> Optional[str]:
    if not os.path.isfile(path):
        return None
    h = hashlib.sha1()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()

@contextmanager
def _interprocess_lock(path: str):
    """简易文件锁(独占创建)。拿不到锁就不抛异常,直接 yield False。"""
    fd = None
    acquired = False
    try:
        # O_EXCL + O_CREAT:若文件已存在会失败
        fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o600)
        os.write(fd, str(os.getpid()).encode("utf-8"))
        acquired = True
        yield True
    except FileExistsError:
        yield False
    finally:
        if fd is not None:
            try:
                os.close(fd)
                os.unlink(path)
            except Exception:
                pass

class WebDavSyncer:
    def __init__(self, local_path: str):
        self.local_path = local_path
        self.url = os.environ.get("WEBDAV_URL", "").strip()
        self.username = os.environ.get("WEBDAV_USERNAME", "")
        self.password = os.environ.get("WEBDAV_PASSWORD", "")
        self.verify_ssl = _bool_env("WEBDAV_VERIFY_SSL", True)
        self.interval = _int_env("WEBDAV_SYNC_INTERVAL", DEFAULT_INTERVAL)
        self.enabled = _bool_env("WEBDAV_SYNC_ENABLED", True) and bool(self.url)
        self._lock = threading.Lock()
        self._stop = threading.Event()
        self._thread = None
        self._last_pushed_hash = None

    def _session(self) -> requests.Session:
        s = requests.Session()
        if self.username:
            s.auth = (self.username, self.password)
        s.verify = self.verify_ssl
        s.headers.update({"User-Agent": "history-sync/1.0"})
        return s

    # ====== Pull(启动时执行一次)======
    def initial_pull(self):
        if not self.enabled:
            print("[webdav-sync] Disabled or missing WEBDAV_URL; skip initial pull.")
            return
        try:
            with self._session() as s:
                # 先 HEAD 看看是否存在
                h = s.head(self.url, timeout=10)
                if h.status_code == 404:
                    print(f"[webdav-sync] Remote not found (404): {self.url}. Skip pull.")
                    return
                if h.status_code >= 400:
                    print(f"[webdav-sync] HEAD failed: {h.status_code} {h.text[:200]}")
                    return
                r = s.get(self.url, timeout=30)
                if r.status_code == 404:
                    print(f"[webdav-sync] Remote not found on GET: {self.url}.")
                    return
                r.raise_for_status()
                content = r.content
                # 简单校验是否是 JSON
                try:
                    json.loads(content.decode("utf-8"))
                except Exception:
                    print("[webdav-sync] Warning: remote content is not valid JSON; still writing as-is.")
                # 写入本地
                os.makedirs(os.path.dirname(self.local_path) or ".", exist_ok=True)
                with open(self.local_path, "wb") as f:
                    f.write(content)
                self._last_pushed_hash = _sha1_of_file(self.local_path)
                print(f"[webdav-sync] Pulled remote -> {self.local_path}")
        except requests.RequestException as e:
            print(f"[webdav-sync] Initial pull failed: {e}")

    # ====== Push(周期性,把本地变更上传)======
    def push_if_changed(self):
        if not self.enabled:
            return
        current_hash = _sha1_of_file(self.local_path)
        if not current_hash:
            # 本地没有文件就不 push
            return
        if current_hash == self._last_pushed_hash:
            return
        try:
            with self._session() as s, open(self.local_path, "rb") as f:
                r = s.put(self.url, data=f, timeout=30)
                if r.status_code >= 400:
                    print(f"[webdav-sync] PUT failed: {r.status_code} {r.text[:200]}")
                    return
                self._last_pushed_hash = current_hash
                print(f"[webdav-sync] Pushed local -> {self.url}")
        except requests.RequestException as e:
            print(f"[webdav-sync] Push failed: {e}")

    def _loop(self):
        # 周期循环
        while not self._stop.is_set():
            try:
                with self._lock:
                    self.push_if_changed()
            except Exception as e:
                print(f"[webdav-sync] Loop error: {e}")
            self._stop.wait(self.interval)

    def start(self):
        if not self.enabled:
            print("[webdav-sync] disabled; not starting background sync.")
            return
        # 进程间锁:只有一个进程会启动线程
        with _interprocess_lock(LOCKFILE_PATH) as ok:
            if not ok:
                print("[webdav-sync] another process is already syncing; skip starting thread.")
                return
            # 记录当前 baseline
            self._last_pushed_hash = _sha1_of_file(self.local_path)
            self._thread = threading.Thread(target=self._loop, name="WebDavSyncer", daemon=True)
            self._thread.start()
            # 注意:离开 with 后锁文件会被释放——这是有意的;
            # 目的只是避免“同时多个进程创建线程”。创建完成后就算释放,也只会有一个线程已在运行。

    def stop(self):
        self._stop.set()
        if self._thread and self._thread.is_alive():
            self._thread.join(timeout=3)

# ====== 供应用调用的便捷入口 ======

def start_webdav_sync_if_configured(local_history_path: str):
    """
    在应用启动时调用:
      1) 先尝试拉取远端文件覆盖本地
      2) 启动后台线程每隔 interval 推送本地变更
    """
    syncer = WebDavSyncer(local_history_path)
    # 先拉取一份,保证“新部署前从远端下载到运行目录”的要求
    syncer.initial_pull()
    # 再启动定时 push
    syncer.start()
    return syncer  # 如需在应用关闭时手动 stop,可保留引用