""" Redis client module — simplified for fetching messages only. Connects to Redis and retrieves recent messages from a room for summarization. """ import os import sys import json import logging from typing import Optional, Any import redis import time from datetime import datetime # Thêm path để load config nếu cần project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) from .config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_KEY_PREFIX logger = logging.getLogger(__name__) class RedisClient: """Singleton Redis client for fetching messages.""" _instance = None _client: Optional[redis.Redis] = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if self._client is None: self._key_prefix = REDIS_KEY_PREFIX self._use_local = False self._local_db_path = os.path.join(project_root, "data", "local_db.json") try: self._client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD, decode_responses=True, socket_connect_timeout=2, # Giảm timeout để fallback nhanh hơn ) self._client.ping() logger.info(f"Redis client connected to {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}") except Exception: self._use_local = True self._client = None logger.warning(f"Redis connection failed. Using local storage: {self._local_db_path}") os.makedirs(os.path.dirname(self._local_db_path), exist_ok=True) if not os.path.exists(self._local_db_path): with open(self._local_db_path, "w", encoding="utf-8") as f: json.dump({"messages": {}, "events": {}, "memories": {}}, f) def _load_local(self) -> dict: try: with open(self._local_db_path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return {"messages": {}, "events": {}, "memories": {}} def _save_local(self, data: dict): try: with open(self._local_db_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Failed to save local DB: {e}") def _key(self, *parts: str) -> str: """Build a prefixed key.""" return f"{self._key_prefix}:{':'.join(str(p) for p in parts)}" def ping(self) -> bool: """Test Redis connection.""" if self._use_local: return True try: return self._client.ping() except Exception as e: logger.error(f"Redis ping failed: {e}") return False def get_active_conversations(self) -> list[str]: """Lấy danh sách các ID phòng/DM đang có dữ liệu trong Redis.""" try: if self._use_local: db = self._load_local() rooms = list(db.get("messages", {}).keys()) return rooms # Quét các key room:messages:* và dm:messages:* hoặc chat:messages:* patterns = ["room:messages:*", "dm:messages:*", "chat:messages:*"] all_keys = [] for p in patterns: # Thử cả có prefix và không có prefix all_keys.extend(self._client.keys(p)) all_keys.extend(self._client.keys(self._key(p))) # Loại bỏ duplicates và trích xuất ID (phần cuối cùng của key) conversation_ids = set() for key in all_keys: parts = key.split(":") if parts: conversation_ids.add(parts[-1]) return list(conversation_ids) except Exception as e: logger.error(f"Failed to get active conversations: {e}") return [] def get_chat_history(self, conversation_id: str, limit: int = 100) -> list[dict]: """Lấy lịch sử chat của một phòng hoặc DM (tự động nhận diện).""" if conversation_id.startswith("room-"): room_id = conversation_id[len("room-"):] return self.get_room_messages(room_id, limit=limit) return self.get_messages_by_conversation_id(conversation_id, limit=limit) def save_room_messages(self, room_id: str, messages: list[dict]) -> bool: """Overwrite messages for a room (used for test seeding).""" try: if self._use_local: db = self._load_local() db.setdefault("messages", {})[room_id] = messages self._save_local(db) return True key = self._key("room", "messages", room_id) self._client.delete(key) for i, msg in enumerate(messages): msg_id = msg.get("id") or f"{room_id}_{i}" ts = i if "timestamp" in msg: try: dt = datetime.fromisoformat(msg["timestamp"].replace("Z", "+00:00")) ts = int(dt.timestamp() * 1000) except Exception: ts = int(time.time() * 1000) + i self._client.zadd(key, {msg_id: ts}) self._client.hset( self._key("msg", msg_id), mapping={k: str(v) for k, v in msg.items()}, ) return True except Exception as e: logger.error(f"Failed to save room messages: {e}") return False def get_room_messages(self, room_id: str, limit: int = 100) -> list[dict]: """ Fetch recent messages from a room by scanning msg* hashes filtered by room_id field. """ if self._use_local: db = self._load_local() msgs = db.get("messages", {}).get(room_id, []) return msgs[-limit:] try: messages = [] for key in self._client.scan_iter("msg*", count=100): stored_room_id = self._client.hget(key, "room_id") if stored_room_id != room_id: continue msg_data = self._client.hgetall(key) if msg_data and msg_data.get("deleted") != "true": msg_data["_key"] = key messages.append(msg_data) messages.sort(key=lambda x: x.get("created_at") or x.get("timestamp", "")) return messages[-limit:] except Exception: return [] def get_dm_messages(self, dm_id: str, limit: int = 100) -> list[dict]: """ Fetch recent messages from a DM conversation. """ if self._use_local: return [] # DMs often have different key patterns based on the DB inspection key = f"dm:messages:{dm_id}" if not self._client.exists(key): key = self._key("dm", "messages", dm_id) try: message_ids = self._client.zrevrange(key, 0, limit - 1) messages = [] for msg_id in message_ids: # Based on inspection, DM message hashes are prefixed with 'dmmsg:' h_key = f"dmmsg:{msg_id}" if not self._client.exists(h_key): h_key = self._key("dmmsg", msg_id) msg_data = self._client.hgetall(h_key) if msg_data: # Map 'sender_id' to 'senderName' if needed by summarizer if "sender_id" in msg_data and "senderName" not in msg_data: msg_data["senderName"] = msg_data["sender_id"] messages.append(msg_data) messages.reverse() return messages except Exception as e: logger.error(f"Error fetching DM messages: {e}") return [] def get_messages_by_conversation_id(self, conversation_id: str, limit: int = 100) -> list[dict]: """ Fetch messages that belong to a specific conversation by scanning hashes. Useful when sorted set index is missing or inconsistent. If conversation_id starts with "room-", delegates to get_room_messages. """ if conversation_id.startswith("room-"): room_id = conversation_id[len("room-"):] return self.get_room_messages(room_id, limit=limit) try: keys = self._client.keys("dmmsg:*") messages = [] for k in keys: data = self._client.hgetall(k) if data.get("conversation_id") == conversation_id: if "sender_id" in data: data["senderName"] = data["sender_id"] if "created_at" in data: data["timestamp"] = data["created_at"] messages.append(data) messages.sort(key=lambda x: x.get("timestamp", "")) return messages[-limit:] except Exception as e: logger.error(f"Error fetching messages by conversation_id: {e}") return [] # --- Event Management Methods --- def save_event(self, event_data: dict) -> bool: """Save an event to Redis or Local JSON.""" try: event_id = event_data.get("id") if not event_id: return False if self._use_local: db = self._load_local() db["events"][event_id] = event_data self._save_local(db) return True # ISO timestamp -> unix timestamp logic ts = int(datetime.now().timestamp() * 1000) if "time" in event_data: try: dt = datetime.fromisoformat(event_data["time"]) ts = int(dt.timestamp() * 1000) except ValueError: pass event_data["timestamp"] = str(ts) self._client.hset(self._key("evt", event_id), mapping=event_data) self._client.zadd(self._key("evt", "index"), {event_id: ts}) return True except Exception as e: logger.error(f"Failed to save event: {e}") return False def list_events(self, start_ts: int = 0, end_ts: int = 4000000000000) -> list[dict]: """List events.""" try: if self._use_local: db = self._load_local() events = list(db["events"].values()) filtered = [] for ev in events: try: # Parse time to check against range dt = datetime.fromisoformat(ev["time"]) ts = int(dt.timestamp() * 1000) if start_ts <= ts <= end_ts: filtered.append(ev) else: logger.info(f"Event {ev.get('name')} ({ev.get('time')}) excluded: ts {ts} outside {start_ts}-{end_ts}") except (ValueError, KeyError, TypeError): # Fallback: if no time, only include if full range if start_ts == 0 and end_ts >= 3000000000000: filtered.append(ev) return filtered index_key = self._key("evt", "index") event_ids = self._client.zrangebyscore(index_key, start_ts, end_ts) events = [] for eid in event_ids: data = self._client.hgetall(self._key("evt", eid)) if data: events.append(data) return events except Exception as e: logger.error(f"Failed to list events: {e}") return [] def delete_event(self, event_id: str) -> bool: """Xóa sự kiện khỏi Redis và Index.""" try: if self._use_local: db = self._load_local() if event_id in db["events"]: del db["events"][event_id] self._save_local(db) return True return False self._client.delete(self._key("evt", event_id)) self._client.zrem(self._key("evt", "index"), event_id) return True except Exception as e: logger.error(f"Failed to delete event {event_id}: {e}") return False # --- Reminder Methods --- def save_reminder(self, reminder_data: dict) -> bool: """Lưu lời nhắc.""" try: rem_id = reminder_data.get("id") ts = int(reminder_data.get("timestamp") or (time.time() * 1000)) if self._use_local: db = self._load_local() db["reminders"][rem_id] = reminder_data self._save_local(db) return True self._client.hset(self._key("rem", rem_id), mapping=reminder_data) self._client.zadd(self._key("rem", "index"), {rem_id: ts}) return True except Exception as e: logger.error(f"Failed to save reminder: {e}") return False def list_reminders(self, limit: int = 50) -> list[dict]: """Liệt kê lời nhắc.""" try: if self._use_local: db = self._load_local() return list(db["reminders"].values())[:limit] rem_ids = self._client.zrevrange(self._key("rem", "index"), 0, limit - 1) reminders = [] for rid in rem_ids: data = self._client.hgetall(self._key("rem", rid)) if data: reminders.append(data) return reminders except Exception as e: logger.error(f"Failed to list reminders: {e}") return [] # --- Decision Log Methods --- def save_decision(self, room_id: str, decision_data: dict) -> bool: """Lưu một quyết định vào log của phòng.""" try: import json as _json ts = int(time.time() * 1000) decision_data["timestamp"] = str(ts) payload = _json.dumps(decision_data, ensure_ascii=False) if self._use_local: db = self._load_local() db.setdefault("decisions", {}).setdefault(room_id, []) db["decisions"][room_id].insert(0, decision_data) self._save_local(db) return True self._client.lpush(self._key("room", "decisions", room_id), payload) return True except Exception as e: logger.error(f"Failed to save decision: {e}") return False def list_decisions(self, room_id: str, limit: int = 20) -> list[dict]: """Lấy danh sách quyết định gần nhất của phòng.""" try: import json as _json if self._use_local: db = self._load_local() return db.get("decisions", {}).get(room_id, [])[:limit] raw = self._client.lrange(self._key("room", "decisions", room_id), 0, limit - 1) return [_json.loads(r) for r in raw] except Exception as e: logger.error(f"Failed to list decisions: {e}") return [] # --- Task Management Methods --- def save_task(self, room_id: str, task_data: dict) -> bool: """Lưu một công việc được giao.""" try: import json as _json ts = int(time.time() * 1000) task_data.setdefault("status", "pending") task_data["timestamp"] = str(ts) payload = _json.dumps(task_data, ensure_ascii=False) if self._use_local: db = self._load_local() db.setdefault("tasks", {}).setdefault(room_id, []) db["tasks"][room_id].insert(0, task_data) self._save_local(db) return True self._client.lpush(self._key("room", "tasks", room_id), payload) return True except Exception as e: logger.error(f"Failed to save task: {e}") return False def list_tasks(self, room_id: str, status: str = None, limit: int = 50) -> list[dict]: """Liệt kê các công việc trong phòng.""" try: import json as _json if self._use_local: db = self._load_local() tasks = db.get("tasks", {}).get(room_id, []) else: raw = self._client.lrange(self._key("room", "tasks", room_id), 0, limit - 1) tasks = [_json.loads(r) for r in raw] if status: tasks = [t for t in tasks if t.get("status") == status] return tasks[:limit] except Exception as e: logger.error(f"Failed to list tasks: {e}") return [] def update_task_status(self, room_id: str, task_index: int, new_status: str) -> bool: """Cập nhật trạng thái task theo chỉ số.""" try: import json as _json key = self._key("room", "tasks", room_id) if self._use_local: db = self._load_local() tasks = db.get("tasks", {}).get(room_id, []) if 0 <= task_index < len(tasks): tasks[task_index]["status"] = new_status self._save_local(db) return True return False raw = self._client.lindex(key, task_index) if not raw: return False task = _json.loads(raw) task["status"] = new_status self._client.lset(key, task_index, _json.dumps(task, ensure_ascii=False)) return True except Exception as e: logger.error(f"Failed to update task status: {e}") return False # --- Reward/Leaderboard Methods --- def add_reward(self, room_id: str, user_name: str, points: int = 1) -> bool: """Cộng điểm cho thành viên.""" try: if self._use_local: db = self._load_local() lb = db.setdefault("leaderboard", {}).setdefault(room_id, {}) lb[user_name] = lb.get(user_name, 0) + points self._save_local(db) return True self._client.zincrby(self._key("room", "leaderboard", room_id), points, user_name) return True except Exception as e: logger.error(f"Failed to add reward: {e}") return False def get_leaderboard(self, room_id: str, top_n: int = 10) -> list[dict]: """Lấy bảng xếp hạng đóng góp của phòng.""" try: if self._use_local: db = self._load_local() lb = db.get("leaderboard", {}).get(room_id, {}) sorted_lb = sorted(lb.items(), key=lambda x: x[1], reverse=True)[:top_n] return [{"user": u, "points": p} for u, p in sorted_lb] results = self._client.zrevrange( self._key("room", "leaderboard", room_id), 0, top_n - 1, withscores=True ) return [{"user": u, "points": int(s)} for u, s in results] except Exception as e: logger.error(f"Failed to get leaderboard: {e}") return [] # --- Memory/Knowledge Management Methods --- def save_memory(self, memory_data: dict) -> bool: """Save memory.""" try: mem_id = memory_data.get("id") if not mem_id: return False ts = int(memory_data.get("timestamp") or (time.time() * 1000)) memory_data["timestamp"] = str(ts) if self._use_local: db = self._load_local() db["memories"][mem_id] = memory_data self._save_local(db) return True self._client.hset(self._key("mem", mem_id), mapping=memory_data) self._client.zadd(self._key("mem", "index"), {mem_id: ts}) return True except Exception as e: logger.error(f"Failed to save memory: {e}") return False def list_memories(self, query: Optional[str] = None, limit: int = 50) -> list[dict]: """List memories.""" try: if self._use_local: db = self._load_local() mems = list(db["memories"].values()) else: index_key = self._key("mem", "index") mem_ids = self._client.zrevrange(index_key, 0, limit - 1) mems = [] for mid in mem_ids: data = self._client.hgetall(self._key("mem", mid)) if data: mems.append(data) if query: q = query.lower() mems = [m for m in mems if q in m.get("content", "").lower() or q in m.get("category", "").lower()] return mems[:limit] except Exception as e: logger.error(f"Failed to list memories: {e}") return [] def delete_memory(self, memory_id: str) -> bool: """Delete from hash and index.""" try: self._client.delete(self._key("mem", memory_id)) self._client.zrem(self._key("mem", "index"), memory_id) return True except Exception as e: logger.error(f"Failed to delete memory {memory_id}: {e}") return False # Global singleton instance redis_client = RedisClient()