openclaw / sync.py
a8926764's picture
Update sync.py
9e895f4 verified
#!/usr/bin/env python3
"""
OpenClaw 配置同步脚本
功能:从 Hugging Face Dataset 拉取配置,并定时推送变更回 Dataset
作者:根据用户需求生成
日期:2026-02-08
更新:修复 huggingface_hub 导入问题
"""
import os
import json
import time
import logging
import hashlib
import shutil
from pathlib import Path
from huggingface_hub import HfApi, hf_hub_download, upload_file, list_repo_files
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class OpenClawConfigSync:
def __init__(self):
self.hf_token = os.getenv('HF_TOKEN', '')
self.dataset_repo = os.getenv('HF_DATASET', '')
self.local_config_dir = Path('/root/.openclaw')
self.sync_interval = 300 # 5分钟同步一次
if not self.hf_token or not self.dataset_repo:
logger.error('HF_TOKEN 或 HF_DATASET 环境变量未设置')
raise ValueError('缺少必要的环境变量')
self.api = HfApi(token=self.hf_token)
self.repo_dir = Path('/tmp/openclaw_dataset')
def calculate_file_hash(self, file_path):
"""计算文件MD5哈希值用于比较变更"""
hash_md5 = hashlib.md5()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception as e:
logger.error(f"计算文件哈希失败 {file_path}: {e}")
return None
def ensure_local_dir(self):
"""确保本地配置目录存在"""
self.local_config_dir.mkdir(parents=True, exist_ok=True)
def download_from_dataset(self):
"""从Dataset拉取最新配置"""
try:
logger.info(f'从Dataset拉取配置: {self.dataset_repo}')
# 确保临时目录存在
self.repo_dir.mkdir(parents=True, exist_ok=True)
# 获取仓库中的文件列表
files = list_repo_files(
repo_id=self.dataset_repo,
repo_type="dataset",
token=self.hf_token
)
if not files:
logger.warning('Dataset中未找到配置文件')
return False
# 下载所有配置文件
downloaded_count = 0
for file_name in files:
if file_name.endswith(('.json', '.yaml', '.yml')):
try:
# 下载文件
local_path = hf_hub_download(
repo_id=self.dataset_repo,
filename=file_name,
repo_type="dataset",
token=self.hf_token,
local_dir=self.repo_dir
)
# 复制到配置目录
config_file = Path(local_path)
dest_file = self.local_config_dir / config_file.name
# 备份原文件
if dest_file.exists():
backup_file = dest_file.with_suffix(f'.bak{int(time.time())}')
dest_file.rename(backup_file)
logger.debug(f'已备份原文件: {backup_file.name}')
shutil.copy2(config_file, dest_file)
logger.info(f'已恢复配置: {config_file.name}')
downloaded_count += 1
except Exception as e:
logger.error(f'下载文件 {file_name} 失败: {e}')
continue
logger.info(f'配置文件下载完成,共下载 {downloaded_count} 个文件')
return downloaded_count > 0
except Exception as e:
logger.error(f'从Dataset拉取配置失败: {e}')
return False
def upload_to_dataset(self):
"""推送配置变更回Dataset"""
try:
logger.info('推送配置变更到Dataset')
# 获取本地配置文件
config_files = list(self.local_config_dir.glob('*'))
config_files = [f for f in config_files if f.suffix in ['.json', '.yaml', '.yml']]
if not config_files:
logger.warning('没有配置文件需要上传')
return False
uploaded_count = 0
for config_file in config_files:
try:
# 上传文件到Dataset
upload_file(
path_or_fileobj=str(config_file),
path_in_repo=config_file.name,
repo_id=self.dataset_repo,
repo_type="dataset",
token=self.hf_token,
commit_message=f"自动同步配置: {config_file.name} - {time.strftime('%Y-%m-%d %H:%M:%S')}"
)
logger.info(f'已上传配置: {config_file.name}')
uploaded_count += 1
except Exception as e:
logger.error(f'上传文件 {config_file.name} 失败: {e}')
continue
logger.info(f'配置文件上传完成,共上传 {uploaded_count} 个文件')
return uploaded_count > 0
except Exception as e:
logger.error(f'推送配置到Dataset失败: {e}')
return False
def config_changed(self):
"""检查配置是否有变更"""
try:
# 检查是否有配置文件
config_files = list(self.local_config_dir.glob('*.json'))
if not config_files:
return False
# 检查是否有 .bak 备份文件(表示有变更)
backup_files = list(self.local_config_dir.glob('*.bak*'))
if backup_files:
return True
# 或者可以检查文件修改时间等
return True # 简化处理,总是返回True
except Exception as e:
logger.error(f'检查配置变更失败: {e}')
return True
def run_sync(self, mode='download'):
"""运行同步流程"""
self.ensure_local_dir()
if mode == 'download':
return self.download_from_dataset()
elif mode == 'upload':
if self.config_changed():
return self.upload_to_dataset()
else:
logger.info('配置无变更,跳过上传')
return True
return False
def start_periodic_sync(self):
"""启动定时同步服务"""
logger.info('启动定时同步服务')
while True:
try:
time.sleep(self.sync_interval)
self.run_sync('upload')
except Exception as e:
logger.error(f'定时同步失败: {e}')
time.sleep(60) # 出错后等待1分钟再重试
def main():
"""主函数"""
import sys
if len(sys.argv) != 2 or sys.argv[1] not in ['download', 'upload', 'sync']:
print('用法: python sync.py [download|upload|sync]')
sys.exit(1)
mode = sys.argv[1]
try:
sync = OpenClawConfigSync()
if mode == 'download':
sync.run_sync('download')
elif mode == 'upload':
sync.run_sync('upload')
elif mode == 'sync':
# 后台运行定时同步
import threading
sync.run_sync('download')
sync_thread = threading.Thread(target=sync.start_periodic_sync)
sync_thread.daemon = True
sync_thread.start()
sync_thread.join()
except Exception as e:
logger.error(f'同步服务失败: {e}')
sys.exit(1)
if __name__ == '__main__':
main()