hermes-pro / src /data_sync.py
tianmingyun's picture
Upload 5 files
4b2b0ac verified
#!/usr/bin/env python3
"""
Hermes Agent Data Sync Service
Handles data persistence to/from Hugging Face Dataset
"""
import os
import sys
import time
import json
import shutil
import tarfile
import argparse
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List
from huggingface_hub import HfApi, hf_hub_download, upload_folder
from loguru import logger
# 文件监控(可选)
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
logger.warning("watchdog not installed, file change detection disabled")
class DatasetManager:
"""Manages data synchronization with Hugging Face Dataset"""
def __init__(self, dataset_repo: Optional[str] = None, token: Optional[str] = None):
self.dataset_repo = dataset_repo or os.environ.get('HF_DATASET_REPO')
self.token = token or os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN')
self.api = HfApi(token=self.token)
self.hermes_home = Path(os.environ.get('HERMES_HOME', '/data/.hermes'))
self.temp_dir = Path('/tmp/hermes_sync')
# 数据路径映射
self.path_mapping = {
'config': self.hermes_home / 'config.yaml',
'env': self.hermes_home / '.env',
'auth': self.hermes_home / 'auth.json',
'soul': self.hermes_home / 'SOUL.md',
'memories': self.hermes_home / 'memories',
'skills': self.hermes_home / 'skills',
'sessions': self.hermes_home / 'sessions',
'state_db': self.hermes_home / 'state.db',
'logs': self.hermes_home / 'logs',
'cron': self.hermes_home / 'cron',
}
def validate(self) -> bool:
"""验证配置是否正确"""
if not self.dataset_repo:
logger.error("HF_DATASET_REPO not set")
return False
if not self.token:
logger.warning("HF_TOKEN not set, will try public dataset")
return True
def prepare_backup_data(self) -> Path:
"""准备备份数据到临时目录"""
logger.info("Preparing backup data...")
# 清理并创建临时目录
if self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
self.temp_dir.mkdir(parents=True)
# 创建目录结构
(self.temp_dir / 'config').mkdir()
(self.temp_dir / 'personality').mkdir()
(self.temp_dir / 'memories').mkdir()
(self.temp_dir / 'skills').mkdir()
(self.temp_dir / 'sessions').mkdir()
(self.temp_dir / 'state').mkdir()
(self.temp_dir / 'logs').mkdir()
(self.temp_dir / 'cron').mkdir()
# 复制文件
try:
# 配置文件
if self.path_mapping['config'].exists():
shutil.copy2(self.path_mapping['config'], self.temp_dir / 'config' / 'config.yaml')
# 环境变量(敏感信息)
if self.path_mapping['env'].exists():
shutil.copy2(self.path_mapping['env'], self.temp_dir / 'config' / '.env')
# OAuth 认证
if self.path_mapping['auth'].exists():
shutil.copy2(self.path_mapping['auth'], self.temp_dir / 'config' / 'auth.json')
# 人格定义
if self.path_mapping['soul'].exists():
shutil.copy2(self.path_mapping['soul'], self.temp_dir / 'personality' / 'SOUL.md')
# 记忆
if self.path_mapping['memories'].exists():
shutil.copytree(self.path_mapping['memories'], self.temp_dir / 'memories', dirs_exist_ok=True)
# 技能
if self.path_mapping['skills'].exists():
shutil.copytree(self.path_mapping['skills'], self.temp_dir / 'skills', dirs_exist_ok=True)
# 会话
if self.path_mapping['sessions'].exists():
shutil.copytree(self.path_mapping['sessions'], self.temp_dir / 'sessions', dirs_exist_ok=True)
# 数据库
if self.path_mapping['state_db'].exists():
shutil.copy2(self.path_mapping['state_db'], self.temp_dir / 'state' / 'state.db')
# 日志
if self.path_mapping['logs'].exists():
shutil.copytree(self.path_mapping['logs'], self.temp_dir / 'logs', dirs_exist_ok=True)
# 定时任务
if self.path_mapping['cron'].exists():
shutil.copytree(self.path_mapping['cron'], self.temp_dir / 'cron', dirs_exist_ok=True)
# 添加元数据
metadata = {
'timestamp': datetime.now().isoformat(),
'version': '0.9.0',
'hermes_home': str(self.hermes_home)
}
with open(self.temp_dir / 'metadata.json', 'w') as f:
json.dump(metadata, f, indent=2)
logger.success(f"Backup prepared at {self.temp_dir}")
return self.temp_dir
except Exception as e:
logger.error(f"Failed to prepare backup: {e}")
raise
def upload_to_dataset(self, force: bool = False) -> bool:
"""上传数据到 Hugging Face Dataset"""
try:
backup_dir = self.prepare_backup_data()
logger.info(f"Uploading to dataset: {self.dataset_repo}")
# 上传文件夹到 dataset
self.api.upload_folder(
folder_path=str(backup_dir),
repo_id=self.dataset_repo,
repo_type="dataset",
commit_message=f"Hermes Agent backup - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logger.success("Backup uploaded successfully")
return True
except Exception as e:
logger.error(f"Failed to upload to dataset: {e}")
return False
def download_from_dataset(self) -> bool:
"""从 Hugging Face Dataset 下载数据"""
try:
logger.info(f"Downloading from dataset: {self.dataset_repo}")
# 创建临时下载目录
download_dir = Path('/tmp/hermes_download')
if download_dir.exists():
shutil.rmtree(download_dir)
download_dir.mkdir(parents=True)
# 下载所有文件
self.api.snapshot_download(
repo_id=self.dataset_repo,
repo_type="dataset",
local_dir=str(download_dir)
)
logger.success("Download completed")
# 恢复数据到 Hermes 目录
self.restore_from_download(download_dir)
return True
except Exception as e:
logger.error(f"Failed to download from dataset: {e}")
return False
def restore_from_download(self, download_dir: Path):
"""从下载的目录恢复数据
注意: config.yaml 在恢复时被跳过,因为 entrypoint.sh 会根据环境变量
重新生成正确的 config.yaml。如果恢复旧的 config.yaml,会导致模型
配置被覆盖(例如 minimaxai/minimax-m2.7 被替换为旧模型)。
"""
logger.info("Restoring data to Hermes home...")
# 确保目标目录存在
self.hermes_home.mkdir(parents=True, exist_ok=True)
# 恢复各个部分(跳过 config.yaml,由 entrypoint.sh 重新生成)
skip_restore = os.environ.get('SKIP_CONFIG_RESTORE', 'true').lower() in ('true', '1', 'yes')
restore_mapping = {
'config/.env': self.path_mapping['env'],
'config/auth.json': self.path_mapping['auth'],
'personality/SOUL.md': self.path_mapping['soul'],
'memories': self.path_mapping['memories'],
'skills': self.path_mapping['skills'],
'sessions': self.path_mapping['sessions'],
'state/state.db': self.path_mapping['state_db'],
'logs': self.path_mapping['logs'],
'cron': self.path_mapping['cron'],
}
# 仅在明确要求时恢复 config.yaml(默认跳过)
if not skip_restore:
restore_mapping['config/config.yaml'] = self.path_mapping['config']
else:
logger.info("Skipping config.yaml restore (will be regenerated by entrypoint.sh)")
# 仅在本地不存在 config.yaml 时恢复作为默认值
if not self.path_mapping['config'].exists():
src = download_dir / 'config' / 'config.yaml'
if src.exists():
logger.info("No local config.yaml found, restoring from backup as initial value")
self.path_mapping['config'].parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, self.path_mapping['config'])
for src_rel, dst in restore_mapping.items():
src = download_dir / src_rel
if src.exists():
try:
if src.is_file():
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src, dst)
logger.info(f"Restored: {src_rel}")
elif src.is_dir():
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
logger.info(f"Restored directory: {src_rel}")
except Exception as e:
logger.error(f"Failed to restore {src_rel}: {e}")
else:
logger.warning(f"Not found in backup: {src_rel}")
logger.success("Data restoration completed")
class ConfigFileHandler(FileSystemEventHandler):
"""配置文件变化处理器 - 实时同步到 Dataset 并触发重载"""
def __init__(self, manager: DatasetManager):
self.manager = manager
self.last_backup_time = 0
self.backup_cooldown = 5 # 5秒内不重复备份
def on_modified(self, event):
"""文件被修改时触发"""
if event.is_directory:
return
# 只关注关键配置文件
watched_files = ['config.yaml', '.env', 'auth.json']
if any(event.src_path.endswith(f) for f in watched_files):
current_time = time.time()
if current_time - self.last_backup_time > self.backup_cooldown:
logger.info(f"Config file changed: {event.src_path}")
logger.info("Triggering immediate backup...")
try:
self.manager.upload_to_dataset()
self.last_backup_time = current_time
logger.success("Immediate backup completed")
# 尝试触发 Hermes 配置重载
self._trigger_reload()
except Exception as e:
logger.error(f"Immediate backup failed: {e}")
def _trigger_reload(self):
"""尝试触发 Hermes 配置重载"""
# 注意:Hermes 目前没有 config reload 命令
# 配置将在下次 Space 重启时自动生效
logger.info("Configuration saved. Please restart Space to apply changes immediately.")
def run_daemon():
"""后台守护进程模式 - 定期同步 + 实时文件监听"""
logger.info("Starting data sync daemon...")
sync_interval = int(os.environ.get('SYNC_INTERVAL', '60')) # 默认60秒(实时模式)
manager = DatasetManager()
if not manager.validate():
logger.error("Configuration invalid, exiting")
sys.exit(1)
logger.info(f"Sync interval: {sync_interval} seconds")
# 如果 watchdog 可用,启动文件监听
observer = None
if WATCHDOG_AVAILABLE:
try:
logger.info("Starting file watcher for real-time sync...")
event_handler = ConfigFileHandler(manager)
observer = Observer()
observer.schedule(event_handler, str(manager.hermes_home), recursive=False)
observer.start()
logger.success("File watcher started - config changes will trigger immediate backup")
except Exception as e:
logger.error(f"Failed to start file watcher: {e}")
logger.warning("Falling back to scheduled sync only")
observer = None
else:
logger.warning("Watchdog not available, using scheduled sync only")
try:
while True:
try:
time.sleep(sync_interval)
logger.info("Performing scheduled backup...")
manager.upload_to_dataset()
except KeyboardInterrupt:
logger.info("Daemon stopped")
break
except Exception as e:
logger.error(f"Sync error: {e}")
finally:
# 清理文件监听器
if observer:
logger.info("Stopping file watcher...")
observer.stop()
observer.join()
logger.info("File watcher stopped")
def main():
parser = argparse.ArgumentParser(description='Hermes Agent Data Sync')
parser.add_argument('action', choices=['backup', 'restore', 'daemon'],
help='Action to perform')
parser.add_argument('--force', '-f', action='store_true',
help='Force backup even if no changes')
args = parser.parse_args()
manager = DatasetManager()
if not manager.validate():
logger.error("Configuration invalid")
sys.exit(1)
if args.action == 'backup':
success = manager.upload_to_dataset(force=args.force)
sys.exit(0 if success else 1)
elif args.action == 'restore':
success = manager.download_from_dataset()
sys.exit(0 if success else 1)
elif args.action == 'daemon':
run_daemon()
if __name__ == '__main__':
main()