""" Human-Clone 系统集成测试 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) import unittest import time import threading from datetime import datetime from src.core import EntityInfo, RedisAdapter from tests.fixtures import create_test_entity_a, create_test_entity_b, get_test_messages class TestCommunication(unittest.TestCase): """通讯集成测试类""" def test_basic_communication(self): """测试基础通讯功能""" print("=== Human-Clone 系统通讯测试 ===") # 创建测试主体 entity_a = create_test_entity_a() entity_b = create_test_entity_b() # 创建适配器 adapter_a = RedisAdapter(entity_a) adapter_b = RedisAdapter(entity_b) # 消息接收回调 received_messages = [] def message_callback(message): received_messages.append(message) print(f"[{message.receiver_id}] 收到来自 {message.sender_id} 的消息: {message.content}") # 启动适配器 print("\n1. 启动适配器...") if adapter_a.start() and adapter_b.start(): print("✓ 适配器启动成功") # 注册回调 adapter_b.register_callback(message_callback) # 等待连接建立 time.sleep(1) # 发送测试消息 print("\n2. 发送测试消息...") test_messages = get_test_messages() for msg in test_messages: if adapter_a.send_message(entity_b.id, msg): print(f"✓ 消息已发送: {msg}") time.sleep(0.5) # 等待消息接收 print("\n3. 等待消息接收...") time.sleep(2) # 检查结果 print(f"\n4. 测试结果:") print(f" 发送消息数: {len(test_messages)}") print(f" 接收消息数: {len(received_messages)}") if len(received_messages) == len(test_messages): print("✓ 通讯测试通过!") else: print("✗ 通讯测试失败!") else: print("✗ 适配器启动失败") # 清理资源 print("\n5. 清理资源...") adapter_a.stop() adapter_b.stop() print("\n=== 测试完成 ===") if __name__ == "__main__": unittest.main()