| |
| """ |
| Hermes Agent Data Sync Service |
| Handles data persistence to/from Hugging Face Dataset |
| """ |
|
|
| import os |
| import sys |
| import time |
| import json |
| import shutil |
| import tarfile |
| import argparse |
| from pathlib import Path |
| from datetime import datetime |
| from typing import Optional, Dict, List |
|
|
| from huggingface_hub import HfApi, hf_hub_download, upload_folder |
| from loguru import logger |
|
|
| |
| try: |
| from watchdog.observers import Observer |
| from watchdog.events import FileSystemEventHandler |
| WATCHDOG_AVAILABLE = True |
| except ImportError: |
| WATCHDOG_AVAILABLE = False |
| logger.warning("watchdog not installed, file change detection disabled") |
|
|
|
|
| class DatasetManager: |
| """Manages data synchronization with Hugging Face Dataset""" |
| |
| def __init__(self, dataset_repo: Optional[str] = None, token: Optional[str] = None): |
| self.dataset_repo = dataset_repo or os.environ.get('HF_DATASET_REPO') |
| self.token = token or os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') |
| self.api = HfApi(token=self.token) |
| self.hermes_home = Path(os.environ.get('HERMES_HOME', '/data/.hermes')) |
| self.temp_dir = Path('/tmp/hermes_sync') |
| |
| |
| self.path_mapping = { |
| 'config': self.hermes_home / 'config.yaml', |
| 'env': self.hermes_home / '.env', |
| 'auth': self.hermes_home / 'auth.json', |
| 'soul': self.hermes_home / 'SOUL.md', |
| 'memories': self.hermes_home / 'memories', |
| 'skills': self.hermes_home / 'skills', |
| 'sessions': self.hermes_home / 'sessions', |
| 'state_db': self.hermes_home / 'state.db', |
| 'logs': self.hermes_home / 'logs', |
| 'cron': self.hermes_home / 'cron', |
| 'webui_token': Path('/data/.hermes-web-ui') / '.token', |
| 'image_cache': self.hermes_home / 'image_cache', |
| 'baoyu_skills': Path('/home/appuser/.baoyu-skills'), |
| } |
| |
| def validate(self) -> bool: |
| """验证配置是否正确""" |
| if not self.dataset_repo: |
| logger.error("HF_DATASET_REPO not set") |
| return False |
| |
| if not self.token: |
| logger.warning("HF_TOKEN not set, will try public dataset") |
| |
| return True |
| |
| def prepare_backup_data(self) -> Path: |
| """准备备份数据到临时目录""" |
| logger.info("Preparing backup data...") |
| |
| |
| if self.temp_dir.exists(): |
| shutil.rmtree(self.temp_dir) |
| self.temp_dir.mkdir(parents=True) |
| |
| |
| (self.temp_dir / 'config').mkdir() |
| (self.temp_dir / 'personality').mkdir() |
| (self.temp_dir / 'memories').mkdir() |
| (self.temp_dir / 'skills').mkdir() |
| (self.temp_dir / 'sessions').mkdir() |
| (self.temp_dir / 'state').mkdir() |
| (self.temp_dir / 'logs').mkdir() |
| (self.temp_dir / 'cron').mkdir() |
| (self.temp_dir / 'webui').mkdir() |
| (self.temp_dir / 'image_cache').mkdir() |
| (self.temp_dir / 'baoyu_skills').mkdir() |
| |
| |
| try: |
| |
| if self.path_mapping['config'].exists(): |
| shutil.copy2(self.path_mapping['config'], self.temp_dir / 'config' / 'config.yaml') |
| |
| |
| if self.path_mapping['env'].exists(): |
| shutil.copy2(self.path_mapping['env'], self.temp_dir / 'config' / '.env') |
| |
| |
| if self.path_mapping['auth'].exists(): |
| shutil.copy2(self.path_mapping['auth'], self.temp_dir / 'config' / 'auth.json') |
| |
| |
| if self.path_mapping['soul'].exists(): |
| shutil.copy2(self.path_mapping['soul'], self.temp_dir / 'personality' / 'SOUL.md') |
| |
| |
| if self.path_mapping['memories'].exists(): |
| shutil.copytree(self.path_mapping['memories'], self.temp_dir / 'memories', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['skills'].exists(): |
| shutil.copytree(self.path_mapping['skills'], self.temp_dir / 'skills', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['sessions'].exists(): |
| shutil.copytree(self.path_mapping['sessions'], self.temp_dir / 'sessions', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['state_db'].exists(): |
| shutil.copy2(self.path_mapping['state_db'], self.temp_dir / 'state' / 'state.db') |
| |
| |
| if self.path_mapping['logs'].exists(): |
| shutil.copytree(self.path_mapping['logs'], self.temp_dir / 'logs', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['cron'].exists(): |
| shutil.copytree(self.path_mapping['cron'], self.temp_dir / 'cron', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['image_cache'].exists(): |
| shutil.copytree(self.path_mapping['image_cache'], self.temp_dir / 'image_cache', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['baoyu_skills'].exists(): |
| shutil.copytree(self.path_mapping['baoyu_skills'], self.temp_dir / 'baoyu_skills', dirs_exist_ok=True) |
| |
| |
| if self.path_mapping['webui_token'].exists(): |
| (self.temp_dir / 'webui').mkdir(exist_ok=True) |
| shutil.copy2(self.path_mapping['webui_token'], self.temp_dir / 'webui' / '.token') |
| |
| |
| |
| logger.info("Fixing permissions in temp backup dir...") |
| import stat |
| for root, dirs, files in os.walk(self.temp_dir): |
| for d in dirs: |
| dir_path = os.path.join(root, d) |
| try: |
| os.chmod(dir_path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) |
| except Exception: |
| pass |
| for f in files: |
| file_path = os.path.join(root, f) |
| try: |
| os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) |
| except Exception: |
| pass |
| |
| |
| metadata = { |
| 'timestamp': datetime.now().isoformat(), |
| 'version': '0.10.0', |
| 'hermes_home': str(self.hermes_home) |
| } |
| with open(self.temp_dir / 'metadata.json', 'w') as f: |
| json.dump(metadata, f, indent=2) |
| |
| logger.success(f"Backup prepared at {self.temp_dir}") |
| return self.temp_dir |
| |
| except Exception as e: |
| logger.error(f"Failed to prepare backup: {e}") |
| raise |
| |
| def upload_to_dataset(self, force: bool = False) -> bool: |
| """上传数据到 Hugging Face Dataset""" |
| try: |
| backup_dir = self.prepare_backup_data() |
| |
| logger.info(f"Uploading to dataset: {self.dataset_repo}") |
| |
| |
| self.api.upload_folder( |
| folder_path=str(backup_dir), |
| repo_id=self.dataset_repo, |
| repo_type="dataset", |
| commit_message=f"Hermes Agent backup - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
| ) |
| |
| logger.success("Backup uploaded successfully") |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to upload to dataset: {e}") |
| return False |
| |
| def download_from_dataset(self) -> bool: |
| """从 Hugging Face Dataset 下载数据""" |
| try: |
| logger.info(f"Downloading from dataset: {self.dataset_repo}") |
| |
| |
| download_dir = Path('/tmp/hermes_download') |
| if download_dir.exists(): |
| shutil.rmtree(download_dir) |
| download_dir.mkdir(parents=True) |
| |
| |
| self.api.snapshot_download( |
| repo_id=self.dataset_repo, |
| repo_type="dataset", |
| local_dir=str(download_dir) |
| ) |
| |
| logger.success("Download completed") |
| |
| |
| self.restore_from_download(download_dir) |
| return True |
| |
| except Exception as e: |
| logger.error(f"Failed to download from dataset: {e}") |
| return False |
| |
| def restore_from_download(self, download_dir: Path): |
| """从下载的目录恢复数据 |
| |
| 注意: config.yaml 在恢复时被跳过,因为 entrypoint.sh 会根据环境变量 |
| 重新生成正确的 config.yaml。如果恢复旧的 config.yaml,会导致模型 |
| 配置被覆盖(例如 minimaxai/minimax-m2.7 被替换为旧模型)。 |
| """ |
| logger.info("Restoring data to Hermes home...") |
| |
| |
| self.hermes_home.mkdir(parents=True, exist_ok=True) |
| |
| |
| |
| |
| |
| |
| |
| skip_restore = os.environ.get('SKIP_CONFIG_RESTORE', 'true').lower() in ('true', '1', 'yes') |
| |
| |
| |
| |
| |
| if skip_restore: |
| skip_paths = { |
| 'config/config.yaml', |
| 'baoyu_skills', |
| } |
| logger.info(f"SKIP_CONFIG_RESTORE=true, skipping: {', '.join(skip_paths)}") |
| else: |
| skip_paths = set() |
| logger.info("SKIP_CONFIG_RESTORE=false, restoring all backed-up configurations") |
| |
| restore_mapping = { |
| 'config/.env': self.path_mapping['env'], |
| 'config/auth.json': self.path_mapping['auth'], |
| 'personality/SOUL.md': self.path_mapping['soul'], |
| 'memories': self.path_mapping['memories'], |
| 'skills': self.path_mapping['skills'], |
| 'sessions': self.path_mapping['sessions'], |
| 'state/state.db': self.path_mapping['state_db'], |
| 'logs': self.path_mapping['logs'], |
| 'cron': self.path_mapping['cron'], |
| 'webui/.token': self.path_mapping['webui_token'], |
| 'image_cache': self.path_mapping['image_cache'], |
| 'baoyu_skills': self.path_mapping['baoyu_skills'], |
| } |
| |
| |
| |
| if not skip_restore: |
| restore_mapping['config/config.yaml'] = self.path_mapping['config'] |
| else: |
| |
| restored_path = self.hermes_home / 'config.yaml.restored' |
| src = download_dir / 'config' / 'config.yaml' |
| if src.exists(): |
| shutil.copy2(src, restored_path) |
| logger.info("Restored config.yaml to config.yaml.restored for merge") |
| |
| for src_rel, dst in restore_mapping.items(): |
| |
| if src_rel in skip_paths: |
| logger.info(f"Skipping restore of {src_rel} (will be regenerated by entrypoint.sh)") |
| continue |
| |
| src = download_dir / src_rel |
| if src.exists(): |
| try: |
| if src.is_file(): |
| dst.parent.mkdir(parents=True, exist_ok=True) |
| shutil.copy2(src, dst) |
| logger.info(f"Restored: {src_rel}") |
| elif src.is_dir(): |
| if dst.exists(): |
| shutil.rmtree(dst) |
| shutil.copytree(src, dst) |
| logger.info(f"Restored directory: {src_rel}") |
| except Exception as e: |
| logger.error(f"Failed to restore {src_rel}: {e}") |
| else: |
| logger.warning(f"Not found in backup: {src_rel}") |
| |
| logger.success("Data restoration completed") |
|
|
|
|
| class ConfigFileHandler(FileSystemEventHandler): |
| """配置文件变化处理器 - 实时同步到 Dataset 并触发重载""" |
| |
| |
| STARTUP_GRACE_PERIOD = 30 |
| |
| def __init__(self, manager: DatasetManager): |
| self.manager = manager |
| self.last_backup_time = 0 |
| self.backup_cooldown = 5 |
| self.start_time = time.time() |
| self._startup_logged = False |
| |
| def on_modified(self, event): |
| """文件被修改时触发""" |
| if event.is_directory: |
| return |
| |
| |
| elapsed = time.time() - self.start_time |
| if elapsed < self.STARTUP_GRACE_PERIOD: |
| if not self._startup_logged: |
| logger.info(f"In startup grace period ({int(self.STARTUP_GRACE_PERIOD - elapsed)}s remaining), skipping backup for: {event.src_path}") |
| self._startup_logged = True |
| return |
| |
| |
| watched_files = ['config.yaml', '.env', 'auth.json'] |
| if any(event.src_path.endswith(f) for f in watched_files): |
| current_time = time.time() |
| if current_time - self.last_backup_time > self.backup_cooldown: |
| logger.info(f"Config file changed: {event.src_path}") |
| logger.info("Triggering immediate backup...") |
| try: |
| self.manager.upload_to_dataset() |
| self.last_backup_time = current_time |
| logger.success("Immediate backup completed") |
| |
| |
| self._trigger_reload() |
| |
| except Exception as e: |
| logger.error(f"Immediate backup failed: {e}") |
| |
| def _trigger_reload(self): |
| """尝试触发 Hermes 配置重载""" |
| |
| |
| logger.info("Configuration saved. Please restart Space to apply changes immediately.") |
|
|
|
|
| def run_daemon(): |
| """后台守护进程模式 - 定期同步 + 实时文件监听""" |
| logger.info("Starting data sync daemon...") |
| |
| sync_interval = int(os.environ.get('SYNC_INTERVAL', '60')) |
| manager = DatasetManager() |
| |
| if not manager.validate(): |
| logger.error("Configuration invalid, exiting") |
| sys.exit(1) |
| |
| logger.info(f"Sync interval: {sync_interval} seconds") |
| |
| |
| observer = None |
| if WATCHDOG_AVAILABLE: |
| try: |
| logger.info("Starting file watcher for real-time sync...") |
| event_handler = ConfigFileHandler(manager) |
| observer = Observer() |
| observer.schedule(event_handler, str(manager.hermes_home), recursive=False) |
| observer.start() |
| logger.success("File watcher started - config changes will trigger immediate backup") |
| except Exception as e: |
| logger.error(f"Failed to start file watcher: {e}") |
| logger.warning("Falling back to scheduled sync only") |
| observer = None |
| else: |
| logger.warning("Watchdog not available, using scheduled sync only") |
| |
| try: |
| while True: |
| try: |
| time.sleep(sync_interval) |
| logger.info("Performing scheduled backup...") |
| manager.upload_to_dataset() |
| except KeyboardInterrupt: |
| logger.info("Daemon stopped") |
| break |
| except Exception as e: |
| logger.error(f"Sync error: {e}") |
| finally: |
| |
| if observer: |
| logger.info("Stopping file watcher...") |
| observer.stop() |
| observer.join() |
| logger.info("File watcher stopped") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description='Hermes Agent Data Sync') |
| parser.add_argument('action', choices=['backup', 'restore', 'daemon'], |
| help='Action to perform') |
| parser.add_argument('--force', '-f', action='store_true', |
| help='Force backup even if no changes') |
| |
| args = parser.parse_args() |
| |
| manager = DatasetManager() |
| |
| if not manager.validate(): |
| logger.error("Configuration invalid") |
| sys.exit(1) |
| |
| if args.action == 'backup': |
| success = manager.upload_to_dataset(force=args.force) |
| sys.exit(0 if success else 1) |
| |
| elif args.action == 'restore': |
| success = manager.download_from_dataset() |
| sys.exit(0 if success else 1) |
| |
| elif args.action == 'daemon': |
| run_daemon() |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|