File size: 14,424 Bytes
9b05b31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
faca88c
 
 
 
 
 
 
 
 
9b05b31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4b2b0ac
 
 
 
 
 
9b05b31
 
 
 
 
4b2b0ac
 
 
9b05b31
 
 
 
 
 
 
 
 
 
 
 
4b2b0ac
 
 
 
 
 
 
 
 
 
 
 
 
9b05b31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
faca88c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a2a8d76
 
 
faca88c
 
9b05b31
faca88c
9b05b31
 
faca88c
9b05b31
 
 
 
 
 
 
 
faca88c
 
 
9b05b31
faca88c
 
 
 
 
 
9b05b31
faca88c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b05b31
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
#!/usr/bin/env python3
"""
Hermes Agent Data Sync Service
Handles data persistence to/from Hugging Face Dataset
"""

import os
import sys
import time
import json
import shutil
import tarfile
import argparse
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, List

from huggingface_hub import HfApi, hf_hub_download, upload_folder
from loguru import logger

# 文件监控(可选)
try:
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
    WATCHDOG_AVAILABLE = True
except ImportError:
    WATCHDOG_AVAILABLE = False
    logger.warning("watchdog not installed, file change detection disabled")


class DatasetManager:
    """Manages data synchronization with Hugging Face Dataset"""
    
    def __init__(self, dataset_repo: Optional[str] = None, token: Optional[str] = None):
        self.dataset_repo = dataset_repo or os.environ.get('HF_DATASET_REPO')
        self.token = token or os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN')
        self.api = HfApi(token=self.token)
        self.hermes_home = Path(os.environ.get('HERMES_HOME', '/data/.hermes'))
        self.temp_dir = Path('/tmp/hermes_sync')
        
        # 数据路径映射
        self.path_mapping = {
            'config': self.hermes_home / 'config.yaml',
            'env': self.hermes_home / '.env',
            'auth': self.hermes_home / 'auth.json',
            'soul': self.hermes_home / 'SOUL.md',
            'memories': self.hermes_home / 'memories',
            'skills': self.hermes_home / 'skills',
            'sessions': self.hermes_home / 'sessions',
            'state_db': self.hermes_home / 'state.db',
            'logs': self.hermes_home / 'logs',
            'cron': self.hermes_home / 'cron',
        }
        
    def validate(self) -> bool:
        """验证配置是否正确"""
        if not self.dataset_repo:
            logger.error("HF_DATASET_REPO not set")
            return False
        
        if not self.token:
            logger.warning("HF_TOKEN not set, will try public dataset")
            
        return True
    
    def prepare_backup_data(self) -> Path:
        """准备备份数据到临时目录"""
        logger.info("Preparing backup data...")
        
        # 清理并创建临时目录
        if self.temp_dir.exists():
            shutil.rmtree(self.temp_dir)
        self.temp_dir.mkdir(parents=True)
        
        # 创建目录结构
        (self.temp_dir / 'config').mkdir()
        (self.temp_dir / 'personality').mkdir()
        (self.temp_dir / 'memories').mkdir()
        (self.temp_dir / 'skills').mkdir()
        (self.temp_dir / 'sessions').mkdir()
        (self.temp_dir / 'state').mkdir()
        (self.temp_dir / 'logs').mkdir()
        (self.temp_dir / 'cron').mkdir()
        
        # 复制文件
        try:
            # 配置文件
            if self.path_mapping['config'].exists():
                shutil.copy2(self.path_mapping['config'], self.temp_dir / 'config' / 'config.yaml')
            
            # 环境变量(敏感信息)
            if self.path_mapping['env'].exists():
                shutil.copy2(self.path_mapping['env'], self.temp_dir / 'config' / '.env')
            
            # OAuth 认证
            if self.path_mapping['auth'].exists():
                shutil.copy2(self.path_mapping['auth'], self.temp_dir / 'config' / 'auth.json')
            
            # 人格定义
            if self.path_mapping['soul'].exists():
                shutil.copy2(self.path_mapping['soul'], self.temp_dir / 'personality' / 'SOUL.md')
            
            # 记忆
            if self.path_mapping['memories'].exists():
                shutil.copytree(self.path_mapping['memories'], self.temp_dir / 'memories', dirs_exist_ok=True)
            
            # 技能
            if self.path_mapping['skills'].exists():
                shutil.copytree(self.path_mapping['skills'], self.temp_dir / 'skills', dirs_exist_ok=True)
            
            # 会话
            if self.path_mapping['sessions'].exists():
                shutil.copytree(self.path_mapping['sessions'], self.temp_dir / 'sessions', dirs_exist_ok=True)
            
            # 数据库
            if self.path_mapping['state_db'].exists():
                shutil.copy2(self.path_mapping['state_db'], self.temp_dir / 'state' / 'state.db')
            
            # 日志
            if self.path_mapping['logs'].exists():
                shutil.copytree(self.path_mapping['logs'], self.temp_dir / 'logs', dirs_exist_ok=True)
            
            # 定时任务
            if self.path_mapping['cron'].exists():
                shutil.copytree(self.path_mapping['cron'], self.temp_dir / 'cron', dirs_exist_ok=True)
            
            # 添加元数据
            metadata = {
                'timestamp': datetime.now().isoformat(),
                'version': '0.9.0',
                'hermes_home': str(self.hermes_home)
            }
            with open(self.temp_dir / 'metadata.json', 'w') as f:
                json.dump(metadata, f, indent=2)
            
            logger.success(f"Backup prepared at {self.temp_dir}")
            return self.temp_dir
            
        except Exception as e:
            logger.error(f"Failed to prepare backup: {e}")
            raise
    
    def upload_to_dataset(self, force: bool = False) -> bool:
        """上传数据到 Hugging Face Dataset"""
        try:
            backup_dir = self.prepare_backup_data()
            
            logger.info(f"Uploading to dataset: {self.dataset_repo}")
            
            # 上传文件夹到 dataset
            self.api.upload_folder(
                folder_path=str(backup_dir),
                repo_id=self.dataset_repo,
                repo_type="dataset",
                commit_message=f"Hermes Agent backup - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            )
            
            logger.success("Backup uploaded successfully")
            return True
            
        except Exception as e:
            logger.error(f"Failed to upload to dataset: {e}")
            return False
    
    def download_from_dataset(self) -> bool:
        """从 Hugging Face Dataset 下载数据"""
        try:
            logger.info(f"Downloading from dataset: {self.dataset_repo}")
            
            # 创建临时下载目录
            download_dir = Path('/tmp/hermes_download')
            if download_dir.exists():
                shutil.rmtree(download_dir)
            download_dir.mkdir(parents=True)
            
            # 下载所有文件
            self.api.snapshot_download(
                repo_id=self.dataset_repo,
                repo_type="dataset",
                local_dir=str(download_dir)
            )
            
            logger.success("Download completed")
            
            # 恢复数据到 Hermes 目录
            self.restore_from_download(download_dir)
            return True
            
        except Exception as e:
            logger.error(f"Failed to download from dataset: {e}")
            return False
    
    def restore_from_download(self, download_dir: Path):
        """从下载的目录恢复数据
        
        注意: config.yaml 在恢复时被跳过,因为 entrypoint.sh 会根据环境变量
        重新生成正确的 config.yaml。如果恢复旧的 config.yaml,会导致模型
        配置被覆盖(例如 minimaxai/minimax-m2.7 被替换为旧模型)。
        """
        logger.info("Restoring data to Hermes home...")
        
        # 确保目标目录存在
        self.hermes_home.mkdir(parents=True, exist_ok=True)
        
        # 恢复各个部分(跳过 config.yaml,由 entrypoint.sh 重新生成)
        skip_restore = os.environ.get('SKIP_CONFIG_RESTORE', 'true').lower() in ('true', '1', 'yes')
        
        restore_mapping = {
            'config/.env': self.path_mapping['env'],
            'config/auth.json': self.path_mapping['auth'],
            'personality/SOUL.md': self.path_mapping['soul'],
            'memories': self.path_mapping['memories'],
            'skills': self.path_mapping['skills'],
            'sessions': self.path_mapping['sessions'],
            'state/state.db': self.path_mapping['state_db'],
            'logs': self.path_mapping['logs'],
            'cron': self.path_mapping['cron'],
        }
        
        # 仅在明确要求时恢复 config.yaml(默认跳过)
        if not skip_restore:
            restore_mapping['config/config.yaml'] = self.path_mapping['config']
        else:
            logger.info("Skipping config.yaml restore (will be regenerated by entrypoint.sh)")
            # 仅在本地不存在 config.yaml 时恢复作为默认值
            if not self.path_mapping['config'].exists():
                src = download_dir / 'config' / 'config.yaml'
                if src.exists():
                    logger.info("No local config.yaml found, restoring from backup as initial value")
                    self.path_mapping['config'].parent.mkdir(parents=True, exist_ok=True)
                    shutil.copy2(src, self.path_mapping['config'])
        
        for src_rel, dst in restore_mapping.items():
            src = download_dir / src_rel
            if src.exists():
                try:
                    if src.is_file():
                        dst.parent.mkdir(parents=True, exist_ok=True)
                        shutil.copy2(src, dst)
                        logger.info(f"Restored: {src_rel}")
                    elif src.is_dir():
                        if dst.exists():
                            shutil.rmtree(dst)
                        shutil.copytree(src, dst)
                        logger.info(f"Restored directory: {src_rel}")
                except Exception as e:
                    logger.error(f"Failed to restore {src_rel}: {e}")
            else:
                logger.warning(f"Not found in backup: {src_rel}")
        
        logger.success("Data restoration completed")


class ConfigFileHandler(FileSystemEventHandler):
    """配置文件变化处理器 - 实时同步到 Dataset 并触发重载"""
    
    def __init__(self, manager: DatasetManager):
        self.manager = manager
        self.last_backup_time = 0
        self.backup_cooldown = 5  # 5秒内不重复备份
        
    def on_modified(self, event):
        """文件被修改时触发"""
        if event.is_directory:
            return
            
        # 只关注关键配置文件
        watched_files = ['config.yaml', '.env', 'auth.json']
        if any(event.src_path.endswith(f) for f in watched_files):
            current_time = time.time()
            if current_time - self.last_backup_time > self.backup_cooldown:
                logger.info(f"Config file changed: {event.src_path}")
                logger.info("Triggering immediate backup...")
                try:
                    self.manager.upload_to_dataset()
                    self.last_backup_time = current_time
                    logger.success("Immediate backup completed")
                    
                    # 尝试触发 Hermes 配置重载
                    self._trigger_reload()
                    
                except Exception as e:
                    logger.error(f"Immediate backup failed: {e}")
    
    def _trigger_reload(self):
        """尝试触发 Hermes 配置重载"""
        # 注意:Hermes 目前没有 config reload 命令
        # 配置将在下次 Space 重启时自动生效
        logger.info("Configuration saved. Please restart Space to apply changes immediately.")


def run_daemon():
    """后台守护进程模式 - 定期同步 + 实时文件监听"""
    logger.info("Starting data sync daemon...")
    
    sync_interval = int(os.environ.get('SYNC_INTERVAL', '60'))  # 默认60秒(实时模式)
    manager = DatasetManager()
    
    if not manager.validate():
        logger.error("Configuration invalid, exiting")
        sys.exit(1)
    
    logger.info(f"Sync interval: {sync_interval} seconds")
    
    # 如果 watchdog 可用,启动文件监听
    observer = None
    if WATCHDOG_AVAILABLE:
        try:
            logger.info("Starting file watcher for real-time sync...")
            event_handler = ConfigFileHandler(manager)
            observer = Observer()
            observer.schedule(event_handler, str(manager.hermes_home), recursive=False)
            observer.start()
            logger.success("File watcher started - config changes will trigger immediate backup")
        except Exception as e:
            logger.error(f"Failed to start file watcher: {e}")
            logger.warning("Falling back to scheduled sync only")
            observer = None
    else:
        logger.warning("Watchdog not available, using scheduled sync only")
    
    try:
        while True:
            try:
                time.sleep(sync_interval)
                logger.info("Performing scheduled backup...")
                manager.upload_to_dataset()
            except KeyboardInterrupt:
                logger.info("Daemon stopped")
                break
            except Exception as e:
                logger.error(f"Sync error: {e}")
    finally:
        # 清理文件监听器
        if observer:
            logger.info("Stopping file watcher...")
            observer.stop()
            observer.join()
            logger.info("File watcher stopped")


def main():
    parser = argparse.ArgumentParser(description='Hermes Agent Data Sync')
    parser.add_argument('action', choices=['backup', 'restore', 'daemon'],
                       help='Action to perform')
    parser.add_argument('--force', '-f', action='store_true',
                       help='Force backup even if no changes')
    
    args = parser.parse_args()
    
    manager = DatasetManager()
    
    if not manager.validate():
        logger.error("Configuration invalid")
        sys.exit(1)
    
    if args.action == 'backup':
        success = manager.upload_to_dataset(force=args.force)
        sys.exit(0 if success else 1)
    
    elif args.action == 'restore':
        success = manager.download_from_dataset()
        sys.exit(0 if success else 1)
    
    elif args.action == 'daemon':
        run_daemon()


if __name__ == '__main__':
    main()