|
|
""" |
|
|
Redis适配器 - 处理消息发送和接收 |
|
|
""" |
|
|
|
|
|
import redis |
|
|
import json |
|
|
import threading |
|
|
import queue |
|
|
import time |
|
|
import logging |
|
|
from dataclasses import dataclass |
|
|
from datetime import datetime |
|
|
from typing import Optional, Callable, Dict, Any |
|
|
|
|
|
from .entities import EntityInfo |
|
|
from .messages import Message |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
MessageCallback = Callable[[Message], None] |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class RedisConfig: |
|
|
"""Redis连接配置""" |
|
|
host: str |
|
|
port: int |
|
|
db: int |
|
|
password: Optional[str] = None |
|
|
|
|
|
def to_connection_params(self) -> Dict[str, Any]: |
|
|
"""获取连接参数""" |
|
|
params = { |
|
|
'host': self.host, |
|
|
'port': self.port, |
|
|
'db': self.db |
|
|
} |
|
|
if self.password: |
|
|
params['password'] = self.password |
|
|
return params |
|
|
|
|
|
|
|
|
class RedisAdapter: |
|
|
"""Redis适配器 - 处理消息发送和接收""" |
|
|
|
|
|
def __init__(self, entity_info: EntityInfo): |
|
|
self.entity_info = entity_info |
|
|
self.redis_config = RedisConfig( |
|
|
host=entity_info.redis_host, |
|
|
port=entity_info.redis_port, |
|
|
db=entity_info.redis_db |
|
|
) |
|
|
|
|
|
|
|
|
self.redis_client: Optional[redis.Redis] = None |
|
|
|
|
|
|
|
|
self.send_queue = queue.Queue() |
|
|
self.send_thread = None |
|
|
self.send_running = False |
|
|
|
|
|
|
|
|
self.receive_queue = queue.Queue() |
|
|
self.receive_thread = None |
|
|
self.receive_running = False |
|
|
self.message_callback: Optional[MessageCallback] = None |
|
|
|
|
|
|
|
|
self.pubsub: Optional[redis.client.PubSub] = None |
|
|
self.connected = False |
|
|
|
|
|
def connect(self) -> bool: |
|
|
"""连接到Redis服务器""" |
|
|
try: |
|
|
self.redis_client = redis.Redis(**self.redis_config.to_connection_params()) |
|
|
self.redis_client.ping() |
|
|
self.connected = True |
|
|
logger.info(f"Entity {self.entity_info.id} connected to Redis") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to Redis: {e}") |
|
|
return False |
|
|
|
|
|
def disconnect(self): |
|
|
"""断开Redis连接""" |
|
|
self.stop() |
|
|
if self.redis_client: |
|
|
self.redis_client.close() |
|
|
self.connected = False |
|
|
logger.info(f"Entity {self.entity_info.id} disconnected from Redis") |
|
|
|
|
|
def start(self) -> bool: |
|
|
"""启动适配器(发送和接收线程)""" |
|
|
if not self.connect(): |
|
|
return False |
|
|
|
|
|
self.send_running = True |
|
|
self.receive_running = True |
|
|
|
|
|
|
|
|
self.send_thread = threading.Thread(target=self._send_worker, daemon=True) |
|
|
self.send_thread.start() |
|
|
|
|
|
|
|
|
self.receive_thread = threading.Thread(target=self._receive_worker, daemon=True) |
|
|
self.receive_thread.start() |
|
|
|
|
|
logger.info(f"RedisAdapter started for entity {self.entity_info.id}") |
|
|
return True |
|
|
|
|
|
def stop(self): |
|
|
"""停止适配器""" |
|
|
self.send_running = False |
|
|
self.receive_running = False |
|
|
|
|
|
|
|
|
if self.send_thread and self.send_thread.is_alive(): |
|
|
self.send_thread.join(timeout=5) |
|
|
if self.receive_thread and self.receive_thread.is_alive(): |
|
|
self.receive_thread.join(timeout=5) |
|
|
|
|
|
logger.info(f"RedisAdapter stopped for entity {self.entity_info.id}") |
|
|
|
|
|
def send_message(self, receiver_id: str, content: str) -> bool: |
|
|
"""发送消息(异步)""" |
|
|
try: |
|
|
message = Message( |
|
|
sender_id=self.entity_info.id, |
|
|
receiver_id=receiver_id, |
|
|
timestamp=datetime.now(), |
|
|
content=content |
|
|
) |
|
|
self.send_queue.put(message) |
|
|
logger.debug(f"Message queued for {receiver_id}") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to queue message: {e}") |
|
|
return False |
|
|
|
|
|
def register_callback(self, callback: MessageCallback): |
|
|
"""注册消息接收回调函数""" |
|
|
self.message_callback = callback |
|
|
|
|
|
def _send_worker(self): |
|
|
"""发送工作线程""" |
|
|
logger.info("Send worker thread started") |
|
|
|
|
|
while self.send_running: |
|
|
try: |
|
|
|
|
|
message = self.send_queue.get(timeout=1) |
|
|
|
|
|
|
|
|
|
|
|
target_channel = message.receiver_id |
|
|
|
|
|
|
|
|
if self.redis_client: |
|
|
self.redis_client.publish( |
|
|
target_channel, |
|
|
json.dumps(message.to_dict()) |
|
|
) |
|
|
logger.debug(f"Message sent to channel {target_channel}") |
|
|
|
|
|
self.send_queue.task_done() |
|
|
|
|
|
except queue.Empty: |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.error(f"Error in send worker: {e}") |
|
|
|
|
|
logger.info("Send worker thread stopped") |
|
|
|
|
|
def _receive_worker(self): |
|
|
"""接收工作线程""" |
|
|
logger.info("Receive worker thread started") |
|
|
|
|
|
if not self.redis_client: |
|
|
logger.error("Redis client not available for receiving") |
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
self.pubsub = self.redis_client.pubsub() |
|
|
self.pubsub.subscribe(self.entity_info.channel) |
|
|
|
|
|
while self.receive_running: |
|
|
try: |
|
|
|
|
|
message = self.pubsub.get_message(timeout=1) |
|
|
|
|
|
if message and message['type'] == 'message': |
|
|
|
|
|
message_data = json.loads(message['data'].decode('utf-8')) |
|
|
received_message = Message.from_dict(message_data) |
|
|
|
|
|
|
|
|
self.receive_queue.put(received_message) |
|
|
|
|
|
|
|
|
self._process_receive_queue() |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing received message: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in receive worker: {e}") |
|
|
finally: |
|
|
if self.pubsub: |
|
|
self.pubsub.close() |
|
|
|
|
|
logger.info("Receive worker thread stopped") |
|
|
|
|
|
def _process_receive_queue(self): |
|
|
"""处理接收队列中的消息""" |
|
|
try: |
|
|
while not self.receive_queue.empty(): |
|
|
message = self.receive_queue.get_nowait() |
|
|
|
|
|
if self.message_callback: |
|
|
try: |
|
|
self.message_callback(message) |
|
|
logger.debug(f"Message delivered to callback") |
|
|
except Exception as e: |
|
|
logger.error(f"Error in message callback: {e}") |
|
|
|
|
|
self.receive_queue.task_done() |
|
|
|
|
|
except queue.Empty: |
|
|
pass |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing receive queue: {e}") |