File size: 7,812 Bytes
82f9be0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
Redis适配器 - 处理消息发送和接收
"""

import redis
import json
import threading
import queue
import time
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Callable, Dict, Any

from .entities import EntityInfo
from .messages import Message


# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# 类型定义
MessageCallback = Callable[[Message], None]


@dataclass
class RedisConfig:
    """Redis连接配置"""
    host: str
    port: int
    db: int
    password: Optional[str] = None
    
    def to_connection_params(self) -> Dict[str, Any]:
        """获取连接参数"""
        params = {
            'host': self.host,
            'port': self.port,
            'db': self.db
        }
        if self.password:
            params['password'] = self.password
        return params


class RedisAdapter:
    """Redis适配器 - 处理消息发送和接收"""
    
    def __init__(self, entity_info: EntityInfo):
        self.entity_info = entity_info
        self.redis_config = RedisConfig(
            host=entity_info.redis_host,
            port=entity_info.redis_port,
            db=entity_info.redis_db
        )
        
        # Redis连接
        self.redis_client: Optional[redis.Redis] = None
        
        # 发送相关
        self.send_queue = queue.Queue()
        self.send_thread = None
        self.send_running = False
        
        # 接收相关
        self.receive_queue = queue.Queue()
        self.receive_thread = None
        self.receive_running = False
        self.message_callback: Optional[MessageCallback] = None
        
        # 连接相关
        self.pubsub: Optional[redis.client.PubSub] = None
        self.connected = False
    
    def connect(self) -> bool:
        """连接到Redis服务器"""
        try:
            self.redis_client = redis.Redis(**self.redis_config.to_connection_params())
            self.redis_client.ping()  # 测试连接
            self.connected = True
            logger.info(f"Entity {self.entity_info.id} connected to Redis")
            return True
        except Exception as e:
            logger.error(f"Failed to connect to Redis: {e}")
            return False
    
    def disconnect(self):
        """断开Redis连接"""
        self.stop()
        if self.redis_client:
            self.redis_client.close()
        self.connected = False
        logger.info(f"Entity {self.entity_info.id} disconnected from Redis")
    
    def start(self) -> bool:
        """启动适配器(发送和接收线程)"""
        if not self.connect():
            return False
        
        self.send_running = True
        self.receive_running = True
        
        # 启动发送线程
        self.send_thread = threading.Thread(target=self._send_worker, daemon=True)
        self.send_thread.start()
        
        # 启动接收线程
        self.receive_thread = threading.Thread(target=self._receive_worker, daemon=True)
        self.receive_thread.start()
        
        logger.info(f"RedisAdapter started for entity {self.entity_info.id}")
        return True
    
    def stop(self):
        """停止适配器"""
        self.send_running = False
        self.receive_running = False
        
        # 等待线程结束
        if self.send_thread and self.send_thread.is_alive():
            self.send_thread.join(timeout=5)
        if self.receive_thread and self.receive_thread.is_alive():
            self.receive_thread.join(timeout=5)
        
        logger.info(f"RedisAdapter stopped for entity {self.entity_info.id}")
    
    def send_message(self, receiver_id: str, content: str) -> bool:
        """发送消息(异步)"""
        try:
            message = Message(
                sender_id=self.entity_info.id,
                receiver_id=receiver_id,
                timestamp=datetime.now(),
                content=content
            )
            self.send_queue.put(message)
            logger.debug(f"Message queued for {receiver_id}")
            return True
        except Exception as e:
            logger.error(f"Failed to queue message: {e}")
            return False
    
    def register_callback(self, callback: MessageCallback):
        """注册消息接收回调函数"""
        self.message_callback = callback
    
    def _send_worker(self):
        """发送工作线程"""
        logger.info("Send worker thread started")
        
        while self.send_running:
            try:
                # 从队列获取消息(超时1秒)
                message = self.send_queue.get(timeout=1)
                
                # 这里需要获取接收者的Redis连接信息
                # 当前简化处理,使用同一个Redis实例
                target_channel = message.receiver_id
                
                # 发送到Redis
                if self.redis_client:
                    self.redis_client.publish(
                        target_channel,
                        json.dumps(message.to_dict())
                    )
                    logger.debug(f"Message sent to channel {target_channel}")
                
                self.send_queue.task_done()
                
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"Error in send worker: {e}")
        
        logger.info("Send worker thread stopped")
    
    def _receive_worker(self):
        """接收工作线程"""
        logger.info("Receive worker thread started")
        
        if not self.redis_client:
            logger.error("Redis client not available for receiving")
            return
        
        try:
            # 创建pubsub对象
            self.pubsub = self.redis_client.pubsub()
            self.pubsub.subscribe(self.entity_info.channel)
            
            while self.receive_running:
                try:
                    # 获取消息(超时1秒)
                    message = self.pubsub.get_message(timeout=1)
                    
                    if message and message['type'] == 'message':
                        # 解析消息数据
                        message_data = json.loads(message['data'].decode('utf-8'))
                        received_message = Message.from_dict(message_data)
                        
                        # 放入接收队列
                        self.receive_queue.put(received_message)
                        
                        # 处理接收队列中的消息
                        self._process_receive_queue()
                        
                except Exception as e:
                    logger.error(f"Error processing received message: {e}")
                    
        except Exception as e:
            logger.error(f"Error in receive worker: {e}")
        finally:
            if self.pubsub:
                self.pubsub.close()
        
        logger.info("Receive worker thread stopped")
    
    def _process_receive_queue(self):
        """处理接收队列中的消息"""
        try:
            while not self.receive_queue.empty():
                message = self.receive_queue.get_nowait()
                
                if self.message_callback:
                    try:
                        self.message_callback(message)
                        logger.debug(f"Message delivered to callback")
                    except Exception as e:
                        logger.error(f"Error in message callback: {e}")
                
                self.receive_queue.task_done()
                
        except queue.Empty:
            pass
        except Exception as e:
            logger.error(f"Error processing receive queue: {e}")