Spaces:
Sleeping
Sleeping
Thêm phần xử lý room_id vào code chính (từ nay, id của room phải thêm prefix room-... để hệ thống nhận diện là room id)
91fe7ce | """ | |
| Redis client module — simplified for fetching messages only. | |
| Connects to Redis and retrieves recent messages from a room for summarization. | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| from typing import Optional, Any | |
| import redis | |
| import time | |
| from datetime import datetime | |
| # Thêm path để load config nếu cần | |
| project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | |
| from .config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_KEY_PREFIX | |
| logger = logging.getLogger(__name__) | |
| class RedisClient: | |
| """Singleton Redis client for fetching messages.""" | |
| _instance = None | |
| _client: Optional[redis.Redis] = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| if self._client is None: | |
| self._key_prefix = REDIS_KEY_PREFIX | |
| self._use_local = False | |
| self._local_db_path = os.path.join(project_root, "data", "local_db.json") | |
| try: | |
| self._client = redis.Redis( | |
| host=REDIS_HOST, | |
| port=REDIS_PORT, | |
| db=REDIS_DB, | |
| password=REDIS_PASSWORD, | |
| decode_responses=True, | |
| socket_connect_timeout=2, | |
| # Giảm timeout để fallback nhanh hơn | |
| ) | |
| self._client.ping() | |
| logger.info(f"Redis client connected to {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}") | |
| except Exception: | |
| self._use_local = True | |
| self._client = None | |
| logger.warning(f"Redis connection failed. Using local storage: {self._local_db_path}") | |
| os.makedirs(os.path.dirname(self._local_db_path), exist_ok=True) | |
| if not os.path.exists(self._local_db_path): | |
| with open(self._local_db_path, "w", encoding="utf-8") as f: | |
| json.dump({"messages": {}, "events": {}, "memories": {}}, f) | |
| def _load_local(self) -> dict: | |
| try: | |
| with open(self._local_db_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {"messages": {}, "events": {}, "memories": {}} | |
| def _save_local(self, data: dict): | |
| try: | |
| with open(self._local_db_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save local DB: {e}") | |
| def _key(self, *parts: str) -> str: | |
| """Build a prefixed key.""" | |
| return f"{self._key_prefix}:{':'.join(str(p) for p in parts)}" | |
| def ping(self) -> bool: | |
| """Test Redis connection.""" | |
| if self._use_local: return True | |
| try: | |
| return self._client.ping() | |
| except Exception as e: | |
| logger.error(f"Redis ping failed: {e}") | |
| return False | |
| def get_active_conversations(self) -> list[str]: | |
| """Lấy danh sách các ID phòng/DM đang có dữ liệu trong Redis.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| rooms = list(db.get("messages", {}).keys()) | |
| return rooms | |
| # Quét các key room:messages:* và dm:messages:* hoặc chat:messages:* | |
| patterns = ["room:messages:*", "dm:messages:*", "chat:messages:*"] | |
| all_keys = [] | |
| for p in patterns: | |
| # Thử cả có prefix và không có prefix | |
| all_keys.extend(self._client.keys(p)) | |
| all_keys.extend(self._client.keys(self._key(p))) | |
| # Loại bỏ duplicates và trích xuất ID (phần cuối cùng của key) | |
| conversation_ids = set() | |
| for key in all_keys: | |
| parts = key.split(":") | |
| if parts: | |
| conversation_ids.add(parts[-1]) | |
| return list(conversation_ids) | |
| except Exception as e: | |
| logger.error(f"Failed to get active conversations: {e}") | |
| return [] | |
| def get_chat_history(self, conversation_id: str, limit: int = 100) -> list[dict]: | |
| """Lấy lịch sử chat của một phòng hoặc DM (tự động nhận diện).""" | |
| if conversation_id.startswith("room-"): | |
| room_id = conversation_id[len("room-"):] | |
| return self.get_room_messages(room_id, limit=limit) | |
| return self.get_messages_by_conversation_id(conversation_id, limit=limit) | |
| def save_room_messages(self, room_id: str, messages: list[dict]) -> bool: | |
| """Overwrite messages for a room (used for test seeding).""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| db.setdefault("messages", {})[room_id] = messages | |
| self._save_local(db) | |
| return True | |
| key = self._key("room", "messages", room_id) | |
| self._client.delete(key) | |
| for i, msg in enumerate(messages): | |
| msg_id = msg.get("id") or f"{room_id}_{i}" | |
| ts = i | |
| if "timestamp" in msg: | |
| try: | |
| dt = datetime.fromisoformat(msg["timestamp"].replace("Z", "+00:00")) | |
| ts = int(dt.timestamp() * 1000) | |
| except Exception: | |
| ts = int(time.time() * 1000) + i | |
| self._client.zadd(key, {msg_id: ts}) | |
| self._client.hset( | |
| self._key("msg", msg_id), | |
| mapping={k: str(v) for k, v in msg.items()}, | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save room messages: {e}") | |
| return False | |
| def get_room_messages(self, room_id: str, limit: int = 100) -> list[dict]: | |
| """ | |
| Fetch recent messages from a room by scanning msg* hashes filtered by room_id field. | |
| """ | |
| if self._use_local: | |
| db = self._load_local() | |
| msgs = db.get("messages", {}).get(room_id, []) | |
| return msgs[-limit:] | |
| try: | |
| messages = [] | |
| for key in self._client.scan_iter("msg*", count=100): | |
| stored_room_id = self._client.hget(key, "room_id") | |
| if stored_room_id != room_id: | |
| continue | |
| msg_data = self._client.hgetall(key) | |
| if msg_data and msg_data.get("deleted") != "true": | |
| msg_data["_key"] = key | |
| messages.append(msg_data) | |
| messages.sort(key=lambda x: x.get("created_at") or x.get("timestamp", "")) | |
| return messages[-limit:] | |
| except Exception: | |
| return [] | |
| def get_dm_messages(self, dm_id: str, limit: int = 100) -> list[dict]: | |
| """ | |
| Fetch recent messages from a DM conversation. | |
| """ | |
| if self._use_local: | |
| return [] | |
| # DMs often have different key patterns based on the DB inspection | |
| key = f"dm:messages:{dm_id}" | |
| if not self._client.exists(key): | |
| key = self._key("dm", "messages", dm_id) | |
| try: | |
| message_ids = self._client.zrevrange(key, 0, limit - 1) | |
| messages = [] | |
| for msg_id in message_ids: | |
| # Based on inspection, DM message hashes are prefixed with 'dmmsg:' | |
| h_key = f"dmmsg:{msg_id}" | |
| if not self._client.exists(h_key): | |
| h_key = self._key("dmmsg", msg_id) | |
| msg_data = self._client.hgetall(h_key) | |
| if msg_data: | |
| # Map 'sender_id' to 'senderName' if needed by summarizer | |
| if "sender_id" in msg_data and "senderName" not in msg_data: | |
| msg_data["senderName"] = msg_data["sender_id"] | |
| messages.append(msg_data) | |
| messages.reverse() | |
| return messages | |
| except Exception as e: | |
| logger.error(f"Error fetching DM messages: {e}") | |
| return [] | |
| def get_messages_by_conversation_id(self, conversation_id: str, limit: int = 100) -> list[dict]: | |
| """ | |
| Fetch messages that belong to a specific conversation by scanning hashes. | |
| Useful when sorted set index is missing or inconsistent. | |
| If conversation_id starts with "room-", delegates to get_room_messages. | |
| """ | |
| if conversation_id.startswith("room-"): | |
| room_id = conversation_id[len("room-"):] | |
| return self.get_room_messages(room_id, limit=limit) | |
| try: | |
| keys = self._client.keys("dmmsg:*") | |
| messages = [] | |
| for k in keys: | |
| data = self._client.hgetall(k) | |
| if data.get("conversation_id") == conversation_id: | |
| if "sender_id" in data: | |
| data["senderName"] = data["sender_id"] | |
| if "created_at" in data: | |
| data["timestamp"] = data["created_at"] | |
| messages.append(data) | |
| messages.sort(key=lambda x: x.get("timestamp", "")) | |
| return messages[-limit:] | |
| except Exception as e: | |
| logger.error(f"Error fetching messages by conversation_id: {e}") | |
| return [] | |
| # --- Event Management Methods --- | |
| def save_event(self, event_data: dict) -> bool: | |
| """Save an event to Redis or Local JSON.""" | |
| try: | |
| event_id = event_data.get("id") | |
| if not event_id: return False | |
| if self._use_local: | |
| db = self._load_local() | |
| db["events"][event_id] = event_data | |
| self._save_local(db) | |
| return True | |
| # ISO timestamp -> unix timestamp logic | |
| ts = int(datetime.now().timestamp() * 1000) | |
| if "time" in event_data: | |
| try: | |
| dt = datetime.fromisoformat(event_data["time"]) | |
| ts = int(dt.timestamp() * 1000) | |
| except ValueError: pass | |
| event_data["timestamp"] = str(ts) | |
| self._client.hset(self._key("evt", event_id), mapping=event_data) | |
| self._client.zadd(self._key("evt", "index"), {event_id: ts}) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save event: {e}") | |
| return False | |
| def list_events(self, start_ts: int = 0, end_ts: int = 4000000000000) -> list[dict]: | |
| """List events.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| events = list(db["events"].values()) | |
| filtered = [] | |
| for ev in events: | |
| try: | |
| # Parse time to check against range | |
| dt = datetime.fromisoformat(ev["time"]) | |
| ts = int(dt.timestamp() * 1000) | |
| if start_ts <= ts <= end_ts: | |
| filtered.append(ev) | |
| else: | |
| logger.info(f"Event {ev.get('name')} ({ev.get('time')}) excluded: ts {ts} outside {start_ts}-{end_ts}") | |
| except (ValueError, KeyError, TypeError): | |
| # Fallback: if no time, only include if full range | |
| if start_ts == 0 and end_ts >= 3000000000000: | |
| filtered.append(ev) | |
| return filtered | |
| index_key = self._key("evt", "index") | |
| event_ids = self._client.zrangebyscore(index_key, start_ts, end_ts) | |
| events = [] | |
| for eid in event_ids: | |
| data = self._client.hgetall(self._key("evt", eid)) | |
| if data: events.append(data) | |
| return events | |
| except Exception as e: | |
| logger.error(f"Failed to list events: {e}") | |
| return [] | |
| def delete_event(self, event_id: str) -> bool: | |
| """Xóa sự kiện khỏi Redis và Index.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| if event_id in db["events"]: | |
| del db["events"][event_id] | |
| self._save_local(db) | |
| return True | |
| return False | |
| self._client.delete(self._key("evt", event_id)) | |
| self._client.zrem(self._key("evt", "index"), event_id) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete event {event_id}: {e}") | |
| return False | |
| # --- Reminder Methods --- | |
| def save_reminder(self, reminder_data: dict) -> bool: | |
| """Lưu lời nhắc.""" | |
| try: | |
| rem_id = reminder_data.get("id") | |
| ts = int(reminder_data.get("timestamp") or (time.time() * 1000)) | |
| if self._use_local: | |
| db = self._load_local() | |
| db["reminders"][rem_id] = reminder_data | |
| self._save_local(db) | |
| return True | |
| self._client.hset(self._key("rem", rem_id), mapping=reminder_data) | |
| self._client.zadd(self._key("rem", "index"), {rem_id: ts}) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save reminder: {e}") | |
| return False | |
| def list_reminders(self, limit: int = 50) -> list[dict]: | |
| """Liệt kê lời nhắc.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| return list(db["reminders"].values())[:limit] | |
| rem_ids = self._client.zrevrange(self._key("rem", "index"), 0, limit - 1) | |
| reminders = [] | |
| for rid in rem_ids: | |
| data = self._client.hgetall(self._key("rem", rid)) | |
| if data: reminders.append(data) | |
| return reminders | |
| except Exception as e: | |
| logger.error(f"Failed to list reminders: {e}") | |
| return [] | |
| # --- Decision Log Methods --- | |
| def save_decision(self, room_id: str, decision_data: dict) -> bool: | |
| """Lưu một quyết định vào log của phòng.""" | |
| try: | |
| import json as _json | |
| ts = int(time.time() * 1000) | |
| decision_data["timestamp"] = str(ts) | |
| payload = _json.dumps(decision_data, ensure_ascii=False) | |
| if self._use_local: | |
| db = self._load_local() | |
| db.setdefault("decisions", {}).setdefault(room_id, []) | |
| db["decisions"][room_id].insert(0, decision_data) | |
| self._save_local(db) | |
| return True | |
| self._client.lpush(self._key("room", "decisions", room_id), payload) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save decision: {e}") | |
| return False | |
| def list_decisions(self, room_id: str, limit: int = 20) -> list[dict]: | |
| """Lấy danh sách quyết định gần nhất của phòng.""" | |
| try: | |
| import json as _json | |
| if self._use_local: | |
| db = self._load_local() | |
| return db.get("decisions", {}).get(room_id, [])[:limit] | |
| raw = self._client.lrange(self._key("room", "decisions", room_id), 0, limit - 1) | |
| return [_json.loads(r) for r in raw] | |
| except Exception as e: | |
| logger.error(f"Failed to list decisions: {e}") | |
| return [] | |
| # --- Task Management Methods --- | |
| def save_task(self, room_id: str, task_data: dict) -> bool: | |
| """Lưu một công việc được giao.""" | |
| try: | |
| import json as _json | |
| ts = int(time.time() * 1000) | |
| task_data.setdefault("status", "pending") | |
| task_data["timestamp"] = str(ts) | |
| payload = _json.dumps(task_data, ensure_ascii=False) | |
| if self._use_local: | |
| db = self._load_local() | |
| db.setdefault("tasks", {}).setdefault(room_id, []) | |
| db["tasks"][room_id].insert(0, task_data) | |
| self._save_local(db) | |
| return True | |
| self._client.lpush(self._key("room", "tasks", room_id), payload) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save task: {e}") | |
| return False | |
| def list_tasks(self, room_id: str, status: str = None, limit: int = 50) -> list[dict]: | |
| """Liệt kê các công việc trong phòng.""" | |
| try: | |
| import json as _json | |
| if self._use_local: | |
| db = self._load_local() | |
| tasks = db.get("tasks", {}).get(room_id, []) | |
| else: | |
| raw = self._client.lrange(self._key("room", "tasks", room_id), 0, limit - 1) | |
| tasks = [_json.loads(r) for r in raw] | |
| if status: | |
| tasks = [t for t in tasks if t.get("status") == status] | |
| return tasks[:limit] | |
| except Exception as e: | |
| logger.error(f"Failed to list tasks: {e}") | |
| return [] | |
| def update_task_status(self, room_id: str, task_index: int, new_status: str) -> bool: | |
| """Cập nhật trạng thái task theo chỉ số.""" | |
| try: | |
| import json as _json | |
| key = self._key("room", "tasks", room_id) | |
| if self._use_local: | |
| db = self._load_local() | |
| tasks = db.get("tasks", {}).get(room_id, []) | |
| if 0 <= task_index < len(tasks): | |
| tasks[task_index]["status"] = new_status | |
| self._save_local(db) | |
| return True | |
| return False | |
| raw = self._client.lindex(key, task_index) | |
| if not raw: | |
| return False | |
| task = _json.loads(raw) | |
| task["status"] = new_status | |
| self._client.lset(key, task_index, _json.dumps(task, ensure_ascii=False)) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to update task status: {e}") | |
| return False | |
| # --- Reward/Leaderboard Methods --- | |
| def add_reward(self, room_id: str, user_name: str, points: int = 1) -> bool: | |
| """Cộng điểm cho thành viên.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| lb = db.setdefault("leaderboard", {}).setdefault(room_id, {}) | |
| lb[user_name] = lb.get(user_name, 0) + points | |
| self._save_local(db) | |
| return True | |
| self._client.zincrby(self._key("room", "leaderboard", room_id), points, user_name) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to add reward: {e}") | |
| return False | |
| def get_leaderboard(self, room_id: str, top_n: int = 10) -> list[dict]: | |
| """Lấy bảng xếp hạng đóng góp của phòng.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| lb = db.get("leaderboard", {}).get(room_id, {}) | |
| sorted_lb = sorted(lb.items(), key=lambda x: x[1], reverse=True)[:top_n] | |
| return [{"user": u, "points": p} for u, p in sorted_lb] | |
| results = self._client.zrevrange( | |
| self._key("room", "leaderboard", room_id), 0, top_n - 1, withscores=True | |
| ) | |
| return [{"user": u, "points": int(s)} for u, s in results] | |
| except Exception as e: | |
| logger.error(f"Failed to get leaderboard: {e}") | |
| return [] | |
| # --- Memory/Knowledge Management Methods --- | |
| def save_memory(self, memory_data: dict) -> bool: | |
| """Save memory.""" | |
| try: | |
| mem_id = memory_data.get("id") | |
| if not mem_id: return False | |
| ts = int(memory_data.get("timestamp") or (time.time() * 1000)) | |
| memory_data["timestamp"] = str(ts) | |
| if self._use_local: | |
| db = self._load_local() | |
| db["memories"][mem_id] = memory_data | |
| self._save_local(db) | |
| return True | |
| self._client.hset(self._key("mem", mem_id), mapping=memory_data) | |
| self._client.zadd(self._key("mem", "index"), {mem_id: ts}) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to save memory: {e}") | |
| return False | |
| def list_memories(self, query: Optional[str] = None, limit: int = 50) -> list[dict]: | |
| """List memories.""" | |
| try: | |
| if self._use_local: | |
| db = self._load_local() | |
| mems = list(db["memories"].values()) | |
| else: | |
| index_key = self._key("mem", "index") | |
| mem_ids = self._client.zrevrange(index_key, 0, limit - 1) | |
| mems = [] | |
| for mid in mem_ids: | |
| data = self._client.hgetall(self._key("mem", mid)) | |
| if data: mems.append(data) | |
| if query: | |
| q = query.lower() | |
| mems = [m for m in mems if q in m.get("content", "").lower() or q in m.get("category", "").lower()] | |
| return mems[:limit] | |
| except Exception as e: | |
| logger.error(f"Failed to list memories: {e}") | |
| return [] | |
| def delete_memory(self, memory_id: str) -> bool: | |
| """Delete from hash and index.""" | |
| try: | |
| self._client.delete(self._key("mem", memory_id)) | |
| self._client.zrem(self._key("mem", "index"), memory_id) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to delete memory {memory_id}: {e}") | |
| return False | |
| # Global singleton instance | |
| redis_client = RedisClient() | |