BG5 commited on
Commit
449b2b9
·
verified ·
1 Parent(s): 26891b6

Update r2_sync.py

Browse files
Files changed (1) hide show
  1. r2_sync.py +125 -290
r2_sync.py CHANGED
@@ -17,10 +17,10 @@ from urllib.parse import unquote
17
  logging.basicConfig(
18
  level=logging.INFO,
19
  format='%(asctime)s - %(levelname)s - %(message)s',
20
- # handlers=[
21
- # logging.FileHandler('r2_sync.log'),
22
- # logging.StreamHandler()
23
- # ]
24
  )
25
  logger = logging.getLogger(__name__)
26
 
@@ -35,24 +35,24 @@ class Config:
35
  self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "4"))
36
  self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "15")) # 秒
37
 
38
- # 忽略的文件后缀配置(逗号分隔,如 ".tmp,.log")
39
- ignore_suffixes = os.getenv("IGNORE_SUFFIXES", ".db-journal,.tmp,.log")
40
- self.ignore_suffixes = [s.strip().lower() for s in ignore_suffixes.split(",") if s.strip()]
 
 
41
 
42
- # 只同步的文件后缀配置(逗号分隔,如 ".jpg,.png",优先级高于忽略后缀)
43
  only_suffixes = os.getenv("ONLY_SUFFIXES", "")
44
  self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()]
45
 
46
- # 验证配置
47
  if not all([self.api_key, self.api_endpoint]):
48
  raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量")
49
 
50
- # 检查冲突配置
51
- if self.ignore_suffixes and self.only_suffixes:
52
- logger.warning("同时设置了IGNORE_SUFFIXES和ONLY_SUFFIXES,将优先使用ONLY_SUFFIXES")
53
 
54
  class SyncHandler(FileSystemEventHandler):
55
- """文件系统事件处理器"""
56
  def __init__(self, callback, config):
57
  self.callback = callback
58
  self.config = config
@@ -61,16 +61,12 @@ class SyncHandler(FileSystemEventHandler):
61
  """检查文件是否应该被同步"""
62
  file_path = str(file_path).lower()
63
 
64
- # 优先检查白名单模式
65
  if self.config.only_suffixes:
66
  return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
67
 
68
- # 然后检查黑名单模式
69
- if self.config.ignore_suffixes:
70
- return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
71
-
72
- # 默认同步所有文件
73
- return True
74
 
75
  def on_modified(self, event):
76
  if not event.is_directory and self.should_sync(event.src_path):
@@ -81,37 +77,28 @@ class SyncHandler(FileSystemEventHandler):
81
  self.callback(event.src_path)
82
 
83
  class R2Sync:
84
- """R2存储同步器"""
85
  def __init__(self, config):
86
  self.config = config
87
- self.file_states = {} # 跟踪文件最后状态 (size, mtime, hash?)
88
- self.upload_queue = {} # 上传队列 {file_path: (key, mtime, timer)}
89
- self.remote_files_cache = None # 缓存远程文件列表
90
- self.cache_valid = False # 缓存是否有效
91
  self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers)
92
  self.cache_lock = threading.Lock()
93
  self.upload_queue_lock = threading.Lock()
94
 
95
- # 确保数据目录存在
96
  self.config.data_dir.mkdir(parents=True, exist_ok=True)
97
 
98
  def should_sync_file(self, file_path):
99
- """检查文件是否应该被同步"""
100
  file_path = str(file_path).lower()
101
-
102
- # 优先检查白名单模式
103
  if self.config.only_suffixes:
104
  return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
105
-
106
- # 然后检查黑名单模式
107
- if self.config.ignore_suffixes:
108
- return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
109
-
110
- # 默认同步所有文件
111
- return True
112
 
113
  def r2_api_request(self, method, path, data=None, max_retries=3):
114
- """发送API请求"""
115
  url = f"{self.config.api_endpoint}/{path}"
116
  headers = {
117
  'X-API-Key': self.config.api_key,
@@ -124,56 +111,53 @@ class R2Sync:
124
  resp = requests.get(url, headers=headers)
125
  elif method == "POST":
126
  resp = requests.post(url, data=data, headers=headers)
127
- elif method == "PUT":
128
- resp = requests.put(url, data=data, headers=headers)
129
  elif method == "DELETE":
130
  resp = requests.delete(url, headers=headers)
131
-
132
- resp.raise_for_status()
133
 
134
- if method == "GET":
135
- return resp.content
136
- elif method == "DELETE":
137
- return True
138
- return resp.json()
139
 
140
  except requests.exceptions.RequestException as e:
141
  if attempt == max_retries - 1:
142
- logger.error(f"API请求最终失败: {e}")
143
  return None
144
  wait_time = (attempt + 1) * 2
145
- logger.warning(f"API请求失败(尝试 {attempt + 1}/{max_retries}), {wait_time}秒后重试...")
146
  time.sleep(wait_time)
147
 
148
  def get_file_hash(self, file_path):
149
- """计算文件哈希值"""
 
 
 
150
  hash_obj = hashlib.blake2b()
151
  with open(file_path, 'rb') as f:
152
  while chunk := f.read(8192):
153
  hash_obj.update(chunk)
154
  return hash_obj.hexdigest()
155
 
156
- def is_file_modified(self, file_path, last_known_state):
157
- """检查文件是否修改"""
158
  try:
159
- stat = os.stat(file_path)
160
- current_state = (stat.st_size, stat.st_mtime)
161
 
162
- if last_known_state[:2] == current_state:
163
- return False
 
164
 
 
165
  if self.config.use_hash_check:
166
  current_hash = self.get_file_hash(file_path)
167
- if len(last_known_state) > 2:
168
- return current_hash != last_known_state[2]
169
- return True
170
- return True
171
  except Exception as e:
172
- logger.error(f"检查文件修改状态出错: {e}")
173
  return False
174
-
175
  def get_remote_files(self, force_refresh=False):
176
- """获取远程文件列表,使用缓存"""
177
  with self.cache_lock:
178
  if force_refresh or not self.cache_valid or self.remote_files_cache is None:
179
  self.remote_files_cache = self._fetch_remote_files()
@@ -187,195 +171,100 @@ class R2Sync:
187
  return None
188
 
189
  try:
190
- raw_resp = resp.decode('utf-8').strip()
191
- if raw_resp == "[]":
192
- return []
193
-
194
- data = json.loads(raw_resp)
195
-
196
- # 统一处理为列表格式
197
  if isinstance(data, dict):
198
- if 'objects' in data:
199
- data = data['objects']
200
- else:
201
- data = [data]
202
- elif not isinstance(data, list):
203
- data = [data]
204
-
205
- # 处理文件名编码
206
- for item in data:
207
- if isinstance(item, dict) and 'key' in item:
208
- try:
209
- item['key'] = unquote(item['key'])
210
- except:
211
- pass
212
-
213
- return data
214
  except Exception as e:
215
- logger.error(f"解析远程文件列表出错: {e}")
216
  return []
217
-
218
- def invalidate_cache(self):
219
- """使缓存失效"""
220
- with self.cache_lock:
221
- self.cache_valid = False
222
-
223
  def download_file(self, key, dest_path):
224
- """下载文件"""
225
  if not self.should_sync_file(dest_path):
226
- logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
227
  return False
228
 
229
- path = f"download/{key}"
230
- logger.debug(f"下载路径: {dest_path} (原始key: {key})")
231
-
232
- resp = self.r2_api_request("GET", path)
233
- if not resp:
234
  return False
235
 
236
- # 确保目标目录存在
237
  dest_path.parent.mkdir(parents=True, exist_ok=True)
238
-
239
- # 处理不同响应类型
240
- if isinstance(resp, bytes):
241
- content = resp
242
- else:
243
- try:
244
- content = resp.decode('utf-8').encode('utf-8')
245
- except UnicodeDecodeError:
246
- content = resp
247
-
248
  with open(dest_path, 'wb') as f:
249
- f.write(content)
250
 
251
- logger.info(f"文件已保存到: {dest_path} (大小: {len(content)}字节)")
 
 
 
 
 
252
  return True
253
-
254
- def _upload_file_task(self, file_path, key):
255
- """线程池中执行的上传任务"""
256
  if not self.should_sync_file(file_path):
257
- logger.debug(f"忽略上传文件(不匹配同步规则): {key}")
258
  return False
259
 
260
- path = f"upload/{key}"
261
  try:
262
  with open(file_path, 'rb') as f:
263
- result = self.r2_api_request("POST", path, f.read()) is not None
264
- if result:
265
- self.invalidate_cache() # 上传成功,使缓存失效
266
- return result
267
  except Exception as e:
268
- logger.error(f"上传文件出错: {e}")
269
  return False
270
-
271
- def upload_file(self, file_path, key):
272
- """上传文件"""
273
- # 提交到线程池执行
274
- future = self.executor.submit(self._upload_file_task, file_path, key)
275
- return future.result() # 阻塞等待结果
276
-
277
  def delete_file(self, key):
278
- """删除文件并更新缓存"""
279
  if not self.should_sync_file(key):
280
- logger.debug(f"忽略删除文件(不匹配同步规则): {key}")
281
  return False
282
 
283
- logger.info(f"删除远程文件: {key}")
284
- path = f"delete/{key}"
285
- if not self.r2_api_request("DELETE", path):
286
  return False
287
 
288
- # 使缓存失效
289
  self.invalidate_cache()
290
-
291
- # 验证文件是否真的被删除
292
- remote_files = self.get_remote_files(force_refresh=True)
293
- if remote_files is None:
294
- return False
295
-
296
- return not any(file_info['key'] == key for file_info in remote_files)
297
-
298
- def check_api_health(self):
299
- """检查API健康状态"""
300
- logger.info("检查API健康状态...")
301
-
302
- # 测试列表API
303
- files = self.get_remote_files()
304
- if files is None:
305
- logger.error("列表API测试失败")
306
- return False
307
-
308
- logger.info("API健康状态检查通过")
309
  return True
310
-
311
  def init_sync(self):
312
- """初始化同步"""
313
- logger.info("初始化数据目录...")
314
-
315
- if not self.check_api_health():
316
- logger.error("API检查失败,请检查配置和网络连接")
317
- sys.exit(1)
318
-
319
- # 获取远程文件列表
320
- remote_files = self.get_remote_files()
321
- if remote_files is None:
322
- logger.error("获取远程文件列表失败")
323
  sys.exit(1)
324
 
325
- logger.info(f"找到 {len(remote_files)} 个远程文件")
326
- if not remote_files:
327
- logger.info("远程存储桶为空,无需同步")
328
- return
329
-
330
- logger.info("开始同步远程文件...")
331
- for file_info in remote_files:
332
  key = file_info['key']
333
  dest_path = self.config.data_dir / key
334
-
335
- if not self.should_sync_file(dest_path):
336
- logger.debug(f"忽略下载文件(不匹配同步规则): {key}")
337
- continue
338
-
339
- logger.info(f"下载: {key} -> {dest_path}")
340
- if self.download_file(key, dest_path):
341
- # 记录文件初始状态
342
- stat = dest_path.stat()
343
- file_state = (stat.st_size, stat.st_mtime)
344
- if self.config.use_hash_check:
345
- file_state += (self.get_file_hash(dest_path),)
346
- self.file_states[key] = file_state
347
-
348
  def handle_file_change(self, file_path):
349
- """处理文件变化事件,加入上传队列并设置定时检查"""
350
  try:
351
  if not self.should_sync_file(file_path):
352
- logger.debug(f"忽略文件变化(不匹配同步规则): {file_path}")
353
  return
354
 
355
- rel_path = Path(file_path).relative_to(self.config.data_dir)
356
- key = str(rel_path).replace('\\', '/')
357
-
358
- # 获取文件当前状态
359
- stat = os.stat(file_path)
360
- current_mtime = stat.st_mtime
361
 
362
  with self.upload_queue_lock:
363
- # 如果文件已在队列中,更新修改时间并重置计时器
364
  if file_path in self.upload_queue:
365
- _, _, timer = self.upload_queue[file_path]
366
- timer.cancel() # 取消之前的计时器
367
 
368
- # 创建新计时器,15秒后检查
369
  timer = threading.Timer(
370
  self.config.upload_retry_delay,
371
  self._process_upload_queue,
372
  args=(file_path,)
373
  )
374
  timer.start()
375
-
376
- # 加入/更新队列
377
- self.upload_queue[file_path] = (key, current_mtime, timer)
378
- logger.info(f"检测到文件变化,加入上传队列: {key} (将在{self.config.upload_retry_delay}秒后检查)")
379
 
380
  except Exception as e:
381
  logger.error(f"处理文件变化出错: {e}")
@@ -385,100 +274,57 @@ class R2Sync:
385
  with self.upload_queue_lock:
386
  if file_path not in self.upload_queue:
387
  return
388
-
389
- key, original_mtime, _ = self.upload_queue[file_path]
390
- del self.upload_queue[file_path] # 从队列移除
391
-
392
  try:
393
- # 检查文件是否还存在
394
  if not os.path.exists(file_path):
395
- logger.info(f"文件已被删除,取消上传: {key}")
396
  return
397
 
398
- # 获取当前文件状态
399
- current_stat = os.stat(file_path)
400
- current_mtime = current_stat.st_mtime
401
 
402
- # 比较修改时间,确认文件是否又有新变化
403
- if current_mtime != original_mtime:
404
- logger.info(f"文件 {key} 在等待期间又有新变化,重新加入队列")
405
- self.handle_file_change(file_path) # 重新加入队列
 
 
406
  return
407
 
408
- # 确认文件无新变化,开始上传
409
- logger.info(f"开始上传文件: {key}")
410
  if self.upload_file(file_path, key):
411
- # 更新文件状态
412
- file_state = (current_stat.st_size, current_mtime)
413
- if self.config.use_hash_check:
414
- file_state += (self.get_file_hash(file_path),)
415
- self.file_states[key] = file_state
416
- else:
417
- logger.error(f"上传失败: {key}")
418
-
419
- except Exception as e:
420
- logger.error(f"处理上传队列出错: {e}")
421
-
422
- def sync_deleted_files(self):
423
- """同步删除操作"""
424
- try:
425
- remote_files = self.get_remote_files()
426
- if remote_files is None:
427
- return
428
-
429
- local_files = {
430
- str(f.relative_to(self.config.data_dir)).replace('\\', '/')
431
- for f in self.config.data_dir.rglob('*')
432
- if f.is_file() and self.should_sync_file(f)
433
- }
434
-
435
- for file_info in remote_files:
436
- key = file_info['key']
437
- if key not in local_files and key not in self.upload_queue:
438
- logger.info(f"删除远程文件: {key}")
439
- if self.delete_file(key):
440
- if key in self.file_states:
441
- del self.file_states[key]
442
- else:
443
- logger.error(f"删除失败: {key}")
444
  except Exception as e:
445
- logger.error(f"同步删除操作出错: {e}")
446
 
447
  def watch_and_sync(self):
448
- """监控并同步文件"""
449
- logger.info("启动持续同步服务...")
450
-
451
- # 初始化文件状态(根据同步规则)
452
  for file in self.config.data_dir.rglob('*'):
453
  if file.is_file() and self.should_sync_file(file):
454
- rel_path = str(file.relative_to(self.config.data_dir)).replace('\\', '/')
455
- stat = file.stat()
456
- file_state = (stat.st_size, stat.st_mtime)
457
- if self.config.use_hash_check:
458
- file_state += (self.get_file_hash(file),)
459
- self.file_states[rel_path] = file_state
460
 
461
- # 设置文件监控(根据同步规则)
462
- event_handler = SyncHandler(self.handle_file_change, self.config)
463
  observer = Observer()
464
- observer.schedule(event_handler, path=str(self.config.data_dir), recursive=True)
 
 
 
 
465
  observer.start()
466
 
467
  try:
468
  while True:
469
- # 定期检查远程文件删除
470
  self.sync_deleted_files()
471
  time.sleep(self.config.sync_interval)
472
- except KeyboardInterrupt:
473
- logger.info("收到停止信号,关闭监控...")
474
- # 取消所有待处理的上传计时器
475
- with self.upload_queue_lock:
476
- for file_path, (_, _, timer) in self.upload_queue.items():
477
- timer.cancel()
478
- self.upload_queue.clear()
479
  observer.stop()
480
- observer.join()
481
- self.executor.shutdown()
482
 
483
  def main():
484
  if len(sys.argv) < 2:
@@ -486,21 +332,10 @@ def main():
486
  sys.exit(1)
487
 
488
  try:
489
- config = Config()
490
- except ValueError as e:
491
- logger.error(str(e))
492
- sys.exit(1)
493
-
494
- r2_sync = R2Sync(config)
495
- command = sys.argv[1]
496
-
497
- if command == "init":
498
- r2_sync.init_sync()
499
- elif command == "sync":
500
- logger.info(f"启动同步服务,间隔: {config.sync_interval}秒")
501
- r2_sync.watch_and_sync()
502
- else:
503
- logger.error(f"未知命令: {command}")
504
  sys.exit(1)
505
 
506
  if __name__ == "__main__":
 
17
  logging.basicConfig(
18
  level=logging.INFO,
19
  format='%(asctime)s - %(levelname)s - %(message)s',
20
+ handlers=[
21
+ logging.FileHandler('r2_sync.log'),
22
+ logging.StreamHandler()
23
+ ]
24
  )
25
  logger = logging.getLogger(__name__)
26
 
 
35
  self.max_upload_workers = int(os.getenv("MAX_UPLOAD_WORKERS", "4"))
36
  self.upload_retry_delay = int(os.getenv("UPLOAD_RETRY_DELAY", "15")) # 秒
37
 
38
+ # 默认忽略的临时文件后缀
39
+ self.ignore_suffixes = [".tmp", ".log", ".bak", ".swp", ".db-journal"]
40
+ # 从环境变量追加忽略后缀
41
+ custom_ignore = os.getenv("IGNORE_SUFFIXES", "")
42
+ self.ignore_suffixes.extend([s.strip().lower() for s in custom_ignore.split(",") if s.strip()])
43
 
44
+ # 白名单模式(优先级高于黑名单)
45
  only_suffixes = os.getenv("ONLY_SUFFIXES", "")
46
  self.only_suffixes = [s.strip().lower() for s in only_suffixes.split(",") if s.strip()]
47
 
 
48
  if not all([self.api_key, self.api_endpoint]):
49
  raise ValueError("必须设置API_AUTH_KEY和API_ENDPOINT环境变量")
50
 
51
+ if self.only_suffixes and self.ignore_suffixes:
52
+ logger.info(f"白名单模式激活,将只同步以下后缀的文件: {', '.join(self.only_suffixes)}")
 
53
 
54
  class SyncHandler(FileSystemEventHandler):
55
+ """文件系统事件处理器(带过滤功能)"""
56
  def __init__(self, callback, config):
57
  self.callback = callback
58
  self.config = config
 
61
  """检查文件是否应该被同步"""
62
  file_path = str(file_path).lower()
63
 
64
+ # 白名单模式优先
65
  if self.config.only_suffixes:
66
  return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
67
 
68
+ # 黑名单模式
69
+ return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
 
 
 
 
70
 
71
  def on_modified(self, event):
72
  if not event.is_directory and self.should_sync(event.src_path):
 
77
  self.callback(event.src_path)
78
 
79
  class R2Sync:
80
+ """R2存储同步器(优化版)"""
81
  def __init__(self, config):
82
  self.config = config
83
+ self.file_states = {} # {file_path: (size, hash?)}
84
+ self.upload_queue = {} # {file_path: (key, size, hash?, timer)}
85
+ self.remote_files_cache = None
86
+ self.cache_valid = False
87
  self.executor = ThreadPoolExecutor(max_workers=self.config.max_upload_workers)
88
  self.cache_lock = threading.Lock()
89
  self.upload_queue_lock = threading.Lock()
90
 
 
91
  self.config.data_dir.mkdir(parents=True, exist_ok=True)
92
 
93
  def should_sync_file(self, file_path):
94
+ """统一文件过滤逻辑"""
95
  file_path = str(file_path).lower()
 
 
96
  if self.config.only_suffixes:
97
  return any(file_path.endswith(suffix) for suffix in self.config.only_suffixes)
98
+ return not any(file_path.endswith(suffix) for suffix in self.config.ignore_suffixes)
 
 
 
 
 
 
99
 
100
  def r2_api_request(self, method, path, data=None, max_retries=3):
101
+ """带重试机制的API请求"""
102
  url = f"{self.config.api_endpoint}/{path}"
103
  headers = {
104
  'X-API-Key': self.config.api_key,
 
111
  resp = requests.get(url, headers=headers)
112
  elif method == "POST":
113
  resp = requests.post(url, data=data, headers=headers)
 
 
114
  elif method == "DELETE":
115
  resp = requests.delete(url, headers=headers)
 
 
116
 
117
+ resp.raise_for_status()
118
+ return resp.content if method == "GET" else resp.json()
 
 
 
119
 
120
  except requests.exceptions.RequestException as e:
121
  if attempt == max_retries - 1:
122
+ logger.error(f"API请求失败: {e}")
123
  return None
124
  wait_time = (attempt + 1) * 2
125
+ logger.warning(f"请求失败,{wait_time}秒后重试...")
126
  time.sleep(wait_time)
127
 
128
  def get_file_hash(self, file_path):
129
+ """计算文件哈希(Blake2b算法)"""
130
+ if not self.config.use_hash_check:
131
+ return None
132
+
133
  hash_obj = hashlib.blake2b()
134
  with open(file_path, 'rb') as f:
135
  while chunk := f.read(8192):
136
  hash_obj.update(chunk)
137
  return hash_obj.hexdigest()
138
 
139
+ def is_file_modified(self, file_path):
140
+ """优化版修改检测:仅当大小变化或哈希不匹配时返回True"""
141
  try:
142
+ current_size = os.path.getsize(file_path)
143
+ last_state = self.file_states.get(file_path, (None, None))
144
 
145
+ # 大小不同肯定被修改了
146
+ if current_size != last_state[0]:
147
+ return True
148
 
149
+ # 大小相同但启用了哈希检查
150
  if self.config.use_hash_check:
151
  current_hash = self.get_file_hash(file_path)
152
+ return current_hash != last_state[1]
153
+
154
+ return False
 
155
  except Exception as e:
156
+ logger.error(f"修改检查错误: {e}")
157
  return False
158
+
159
  def get_remote_files(self, force_refresh=False):
160
+ """带缓存的远程文件列表获取"""
161
  with self.cache_lock:
162
  if force_refresh or not self.cache_valid or self.remote_files_cache is None:
163
  self.remote_files_cache = self._fetch_remote_files()
 
171
  return None
172
 
173
  try:
174
+ data = json.loads(resp.decode('utf-8'))
 
 
 
 
 
 
175
  if isinstance(data, dict):
176
+ return data.get('objects', [data])
177
+ return data if isinstance(data, list) else [data]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
178
  except Exception as e:
179
+ logger.error(f"解析文件列表出错: {e}")
180
  return []
181
+
 
 
 
 
 
182
  def download_file(self, key, dest_path):
183
+ """下载文件到本��"""
184
  if not self.should_sync_file(dest_path):
185
+ logger.debug(f"忽略下载: {key}")
186
  return False
187
 
188
+ content = self.r2_api_request("GET", f"download/{key}")
189
+ if not content:
 
 
 
190
  return False
191
 
 
192
  dest_path.parent.mkdir(parents=True, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
193
  with open(dest_path, 'wb') as f:
194
+ f.write(content if isinstance(content, bytes) else content.encode('utf-8'))
195
 
196
+ # 记录文件状态
197
+ file_size = os.path.getsize(dest_path)
198
+ file_hash = self.get_file_hash(dest_path) if self.config.use_hash_check else None
199
+ self.file_states[dest_path] = (file_size, file_hash)
200
+
201
+ logger.info(f"下载完成: {key} ({file_size}字节)")
202
  return True
203
+
204
+ def upload_file(self, file_path, key):
205
+ """上传文件到R2"""
206
  if not self.should_sync_file(file_path):
207
+ logger.debug(f"忽略上传: {key}")
208
  return False
209
 
 
210
  try:
211
  with open(file_path, 'rb') as f:
212
+ success = self.r2_api_request("POST", f"upload/{key}", f.read()) is not None
213
+ if success:
214
+ self.invalidate_cache()
215
+ return success
216
  except Exception as e:
217
+ logger.error(f"上传失败: {e}")
218
  return False
219
+
 
 
 
 
 
 
220
  def delete_file(self, key):
221
+ """删除远程文件"""
222
  if not self.should_sync_file(key):
223
+ logger.debug(f"忽略删除: {key}")
224
  return False
225
 
226
+ if not self.r2_api_request("DELETE", f"delete/{key}"):
 
 
227
  return False
228
 
 
229
  self.invalidate_cache()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
230
  return True
231
+
232
  def init_sync(self):
233
+ """初始化同步:下载所有远程文件"""
234
+ if not self.get_remote_files():
235
+ logger.error("无法获取远程文件列表")
 
 
 
 
 
 
 
 
236
  sys.exit(1)
237
 
238
+ for file_info in self.get_remote_files():
 
 
 
 
 
 
239
  key = file_info['key']
240
  dest_path = self.config.data_dir / key
241
+ if self.should_sync_file(dest_path):
242
+ self.download_file(key, dest_path)
243
+
 
 
 
 
 
 
 
 
 
 
 
244
  def handle_file_change(self, file_path):
245
+ """处理文件变化事件(带防抖)"""
246
  try:
247
  if not self.should_sync_file(file_path):
 
248
  return
249
 
250
+ key = str(Path(file_path).relative_to(self.config.data_dir)).replace('\\', '/')
251
+ current_size = os.path.getsize(file_path)
252
+ current_hash = self.get_file_hash(file_path) if self.config.use_hash_check else None
 
 
 
253
 
254
  with self.upload_queue_lock:
255
+ # 取消已有计时器
256
  if file_path in self.upload_queue:
257
+ self.upload_queue[file_path][3].cancel()
 
258
 
259
+ # 创建新计时器
260
  timer = threading.Timer(
261
  self.config.upload_retry_delay,
262
  self._process_upload_queue,
263
  args=(file_path,)
264
  )
265
  timer.start()
266
+ self.upload_queue[file_path] = (key, current_size, current_hash, timer)
267
+ logger.info(f"文件加入上传队列: {key}")
 
 
268
 
269
  except Exception as e:
270
  logger.error(f"处理文件变化出错: {e}")
 
274
  with self.upload_queue_lock:
275
  if file_path not in self.upload_queue:
276
  return
277
+ key, original_size, original_hash, _ = self.upload_queue.pop(file_path)
278
+
 
 
279
  try:
 
280
  if not os.path.exists(file_path):
281
+ logger.info(f"文件已删除: {key}")
282
  return
283
 
284
+ current_size = os.path.getsize(file_path)
285
+ current_hash = self.get_file_hash(file_path) if self.config.use_hash_check else None
 
286
 
287
+ # 检查是否有新变化
288
+ if current_size != original_size or (
289
+ self.config.use_hash_check and current_hash != original_hash
290
+ ):
291
+ logger.info(f"文件有变化,重新加入队列: {key}")
292
+ self.handle_file_change(file_path)
293
  return
294
 
295
+ # 执行上传
 
296
  if self.upload_file(file_path, key):
297
+ self.file_states[file_path] = (current_size, current_hash)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
298
  except Exception as e:
299
+ logger.error(f"处理上传出错: {e}")
300
 
301
  def watch_and_sync(self):
302
+ """主监控循环"""
303
+ # 初始化文件状态
 
 
304
  for file in self.config.data_dir.rglob('*'):
305
  if file.is_file() and self.should_sync_file(file):
306
+ self.file_states[file] = (
307
+ os.path.getsize(file),
308
+ self.get_file_hash(file) if self.config.use_hash_check else None
309
+ )
 
 
310
 
311
+ # 启动文件监控
 
312
  observer = Observer()
313
+ observer.schedule(
314
+ SyncHandler(self.handle_file_change, self.config),
315
+ path=str(self.config.data_dir),
316
+ recursive=True
317
+ )
318
  observer.start()
319
 
320
  try:
321
  while True:
 
322
  self.sync_deleted_files()
323
  time.sleep(self.config.sync_interval)
324
+ finally:
 
 
 
 
 
 
325
  observer.stop()
326
+ observer.join()
327
+ self.executor.shutdown()
328
 
329
  def main():
330
  if len(sys.argv) < 2:
 
332
  sys.exit(1)
333
 
334
  try:
335
+ r2_sync = R2Sync(Config())
336
+ {"init": r2_sync.init_sync, "sync": r2_sync.watch_and_sync}[sys.argv[1]]()
337
+ except Exception as e:
338
+ logger.error(f"运行失败: {e}")
 
 
 
 
 
 
 
 
 
 
 
339
  sys.exit(1)
340
 
341
  if __name__ == "__main__":