Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Sync utility for OpenClaw HF Deployment | |
| Handles agent-channel synchronization and state management | |
| """ | |
| import os | |
| import json | |
| from pathlib import Path | |
| def sync_agent_bindings(): | |
| """同步Agent与通道绑定关系""" | |
| state_dir = os.environ.get("OPENCLAW_STATE_DIR", "/root/.openclaw") | |
| # 读取配置 | |
| config_path = f"{state_dir}/openclaw.json" | |
| bindings_path = f"{state_dir}/agent_bindings.json" | |
| if not os.path.exists(config_path): | |
| print("[Sync] Config not found, skipping") | |
| return | |
| with open(config_path, 'r') as f: | |
| config = json.load(f) | |
| # 读取绑定关系 | |
| if os.path.exists(bindings_path): | |
| with open(bindings_path, 'r') as f: | |
| bindings = json.load(f) | |
| # 更新agents的channels | |
| agents = config.get("agents", []) | |
| for agent in agents: | |
| agent_id = agent.get("id") | |
| agent["channels"] = [] | |
| for binding in bindings: | |
| if binding.get("agent_id") == agent_id: | |
| agent["channels"].append({ | |
| "id": binding.get("channel_id"), | |
| "type": binding.get("channel_type"), | |
| "status": binding.get("status") | |
| }) | |
| # 保存更新后的配置 | |
| with open(config_path, 'w') as f: | |
| json.dump(config, f, indent=2) | |
| print(f"[Sync] Updated {len(agents)} agents with channel bindings") | |
| else: | |
| print("[Sync] No bindings file found") | |
| def main(): | |
| print("[Sync] Starting synchronization...") | |
| sync_agent_bindings() | |
| print("[Sync] Completed!") | |
| if __name__ == "__main__": | |
| main() | |