092_agent_api / redis_client.py
anhkhoiphan's picture
Thêm phần xử lý room_id vào code chính (từ nay, id của room phải thêm prefix room-... để hệ thống nhận diện là room id)
91fe7ce
"""
Redis client module — simplified for fetching messages only.
Connects to Redis and retrieves recent messages from a room for summarization.
"""
import os
import sys
import json
import logging
from typing import Optional, Any
import redis
import time
from datetime import datetime
# Thêm path để load config nếu cần
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
from .config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_KEY_PREFIX
logger = logging.getLogger(__name__)
class RedisClient:
"""Singleton Redis client for fetching messages."""
_instance = None
_client: Optional[redis.Redis] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._client is None:
self._key_prefix = REDIS_KEY_PREFIX
self._use_local = False
self._local_db_path = os.path.join(project_root, "data", "local_db.json")
try:
self._client = redis.Redis(
host=REDIS_HOST,
port=REDIS_PORT,
db=REDIS_DB,
password=REDIS_PASSWORD,
decode_responses=True,
socket_connect_timeout=2,
# Giảm timeout để fallback nhanh hơn
)
self._client.ping()
logger.info(f"Redis client connected to {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}")
except Exception:
self._use_local = True
self._client = None
logger.warning(f"Redis connection failed. Using local storage: {self._local_db_path}")
os.makedirs(os.path.dirname(self._local_db_path), exist_ok=True)
if not os.path.exists(self._local_db_path):
with open(self._local_db_path, "w", encoding="utf-8") as f:
json.dump({"messages": {}, "events": {}, "memories": {}}, f)
def _load_local(self) -> dict:
try:
with open(self._local_db_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"messages": {}, "events": {}, "memories": {}}
def _save_local(self, data: dict):
try:
with open(self._local_db_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to save local DB: {e}")
def _key(self, *parts: str) -> str:
"""Build a prefixed key."""
return f"{self._key_prefix}:{':'.join(str(p) for p in parts)}"
def ping(self) -> bool:
"""Test Redis connection."""
if self._use_local: return True
try:
return self._client.ping()
except Exception as e:
logger.error(f"Redis ping failed: {e}")
return False
def get_active_conversations(self) -> list[str]:
"""Lấy danh sách các ID phòng/DM đang có dữ liệu trong Redis."""
try:
if self._use_local:
db = self._load_local()
rooms = list(db.get("messages", {}).keys())
return rooms
# Quét các key room:messages:* và dm:messages:* hoặc chat:messages:*
patterns = ["room:messages:*", "dm:messages:*", "chat:messages:*"]
all_keys = []
for p in patterns:
# Thử cả có prefix và không có prefix
all_keys.extend(self._client.keys(p))
all_keys.extend(self._client.keys(self._key(p)))
# Loại bỏ duplicates và trích xuất ID (phần cuối cùng của key)
conversation_ids = set()
for key in all_keys:
parts = key.split(":")
if parts:
conversation_ids.add(parts[-1])
return list(conversation_ids)
except Exception as e:
logger.error(f"Failed to get active conversations: {e}")
return []
def get_chat_history(self, conversation_id: str, limit: int = 100) -> list[dict]:
"""Lấy lịch sử chat của một phòng hoặc DM (tự động nhận diện)."""
if conversation_id.startswith("room-"):
room_id = conversation_id[len("room-"):]
return self.get_room_messages(room_id, limit=limit)
return self.get_messages_by_conversation_id(conversation_id, limit=limit)
def save_room_messages(self, room_id: str, messages: list[dict]) -> bool:
"""Overwrite messages for a room (used for test seeding)."""
try:
if self._use_local:
db = self._load_local()
db.setdefault("messages", {})[room_id] = messages
self._save_local(db)
return True
key = self._key("room", "messages", room_id)
self._client.delete(key)
for i, msg in enumerate(messages):
msg_id = msg.get("id") or f"{room_id}_{i}"
ts = i
if "timestamp" in msg:
try:
dt = datetime.fromisoformat(msg["timestamp"].replace("Z", "+00:00"))
ts = int(dt.timestamp() * 1000)
except Exception:
ts = int(time.time() * 1000) + i
self._client.zadd(key, {msg_id: ts})
self._client.hset(
self._key("msg", msg_id),
mapping={k: str(v) for k, v in msg.items()},
)
return True
except Exception as e:
logger.error(f"Failed to save room messages: {e}")
return False
def get_room_messages(self, room_id: str, limit: int = 100) -> list[dict]:
"""
Fetch recent messages from a room by scanning msg* hashes filtered by room_id field.
"""
if self._use_local:
db = self._load_local()
msgs = db.get("messages", {}).get(room_id, [])
return msgs[-limit:]
try:
messages = []
for key in self._client.scan_iter("msg*", count=100):
stored_room_id = self._client.hget(key, "room_id")
if stored_room_id != room_id:
continue
msg_data = self._client.hgetall(key)
if msg_data and msg_data.get("deleted") != "true":
msg_data["_key"] = key
messages.append(msg_data)
messages.sort(key=lambda x: x.get("created_at") or x.get("timestamp", ""))
return messages[-limit:]
except Exception:
return []
def get_dm_messages(self, dm_id: str, limit: int = 100) -> list[dict]:
"""
Fetch recent messages from a DM conversation.
"""
if self._use_local:
return []
# DMs often have different key patterns based on the DB inspection
key = f"dm:messages:{dm_id}"
if not self._client.exists(key):
key = self._key("dm", "messages", dm_id)
try:
message_ids = self._client.zrevrange(key, 0, limit - 1)
messages = []
for msg_id in message_ids:
# Based on inspection, DM message hashes are prefixed with 'dmmsg:'
h_key = f"dmmsg:{msg_id}"
if not self._client.exists(h_key):
h_key = self._key("dmmsg", msg_id)
msg_data = self._client.hgetall(h_key)
if msg_data:
# Map 'sender_id' to 'senderName' if needed by summarizer
if "sender_id" in msg_data and "senderName" not in msg_data:
msg_data["senderName"] = msg_data["sender_id"]
messages.append(msg_data)
messages.reverse()
return messages
except Exception as e:
logger.error(f"Error fetching DM messages: {e}")
return []
def get_messages_by_conversation_id(self, conversation_id: str, limit: int = 100) -> list[dict]:
"""
Fetch messages that belong to a specific conversation by scanning hashes.
Useful when sorted set index is missing or inconsistent.
If conversation_id starts with "room-", delegates to get_room_messages.
"""
if conversation_id.startswith("room-"):
room_id = conversation_id[len("room-"):]
return self.get_room_messages(room_id, limit=limit)
try:
keys = self._client.keys("dmmsg:*")
messages = []
for k in keys:
data = self._client.hgetall(k)
if data.get("conversation_id") == conversation_id:
if "sender_id" in data:
data["senderName"] = data["sender_id"]
if "created_at" in data:
data["timestamp"] = data["created_at"]
messages.append(data)
messages.sort(key=lambda x: x.get("timestamp", ""))
return messages[-limit:]
except Exception as e:
logger.error(f"Error fetching messages by conversation_id: {e}")
return []
# --- Event Management Methods ---
def save_event(self, event_data: dict) -> bool:
"""Save an event to Redis or Local JSON."""
try:
event_id = event_data.get("id")
if not event_id: return False
if self._use_local:
db = self._load_local()
db["events"][event_id] = event_data
self._save_local(db)
return True
# ISO timestamp -> unix timestamp logic
ts = int(datetime.now().timestamp() * 1000)
if "time" in event_data:
try:
dt = datetime.fromisoformat(event_data["time"])
ts = int(dt.timestamp() * 1000)
except ValueError: pass
event_data["timestamp"] = str(ts)
self._client.hset(self._key("evt", event_id), mapping=event_data)
self._client.zadd(self._key("evt", "index"), {event_id: ts})
return True
except Exception as e:
logger.error(f"Failed to save event: {e}")
return False
def list_events(self, start_ts: int = 0, end_ts: int = 4000000000000) -> list[dict]:
"""List events."""
try:
if self._use_local:
db = self._load_local()
events = list(db["events"].values())
filtered = []
for ev in events:
try:
# Parse time to check against range
dt = datetime.fromisoformat(ev["time"])
ts = int(dt.timestamp() * 1000)
if start_ts <= ts <= end_ts:
filtered.append(ev)
else:
logger.info(f"Event {ev.get('name')} ({ev.get('time')}) excluded: ts {ts} outside {start_ts}-{end_ts}")
except (ValueError, KeyError, TypeError):
# Fallback: if no time, only include if full range
if start_ts == 0 and end_ts >= 3000000000000:
filtered.append(ev)
return filtered
index_key = self._key("evt", "index")
event_ids = self._client.zrangebyscore(index_key, start_ts, end_ts)
events = []
for eid in event_ids:
data = self._client.hgetall(self._key("evt", eid))
if data: events.append(data)
return events
except Exception as e:
logger.error(f"Failed to list events: {e}")
return []
def delete_event(self, event_id: str) -> bool:
"""Xóa sự kiện khỏi Redis và Index."""
try:
if self._use_local:
db = self._load_local()
if event_id in db["events"]:
del db["events"][event_id]
self._save_local(db)
return True
return False
self._client.delete(self._key("evt", event_id))
self._client.zrem(self._key("evt", "index"), event_id)
return True
except Exception as e:
logger.error(f"Failed to delete event {event_id}: {e}")
return False
# --- Reminder Methods ---
def save_reminder(self, reminder_data: dict) -> bool:
"""Lưu lời nhắc."""
try:
rem_id = reminder_data.get("id")
ts = int(reminder_data.get("timestamp") or (time.time() * 1000))
if self._use_local:
db = self._load_local()
db["reminders"][rem_id] = reminder_data
self._save_local(db)
return True
self._client.hset(self._key("rem", rem_id), mapping=reminder_data)
self._client.zadd(self._key("rem", "index"), {rem_id: ts})
return True
except Exception as e:
logger.error(f"Failed to save reminder: {e}")
return False
def list_reminders(self, limit: int = 50) -> list[dict]:
"""Liệt kê lời nhắc."""
try:
if self._use_local:
db = self._load_local()
return list(db["reminders"].values())[:limit]
rem_ids = self._client.zrevrange(self._key("rem", "index"), 0, limit - 1)
reminders = []
for rid in rem_ids:
data = self._client.hgetall(self._key("rem", rid))
if data: reminders.append(data)
return reminders
except Exception as e:
logger.error(f"Failed to list reminders: {e}")
return []
# --- Decision Log Methods ---
def save_decision(self, room_id: str, decision_data: dict) -> bool:
"""Lưu một quyết định vào log của phòng."""
try:
import json as _json
ts = int(time.time() * 1000)
decision_data["timestamp"] = str(ts)
payload = _json.dumps(decision_data, ensure_ascii=False)
if self._use_local:
db = self._load_local()
db.setdefault("decisions", {}).setdefault(room_id, [])
db["decisions"][room_id].insert(0, decision_data)
self._save_local(db)
return True
self._client.lpush(self._key("room", "decisions", room_id), payload)
return True
except Exception as e:
logger.error(f"Failed to save decision: {e}")
return False
def list_decisions(self, room_id: str, limit: int = 20) -> list[dict]:
"""Lấy danh sách quyết định gần nhất của phòng."""
try:
import json as _json
if self._use_local:
db = self._load_local()
return db.get("decisions", {}).get(room_id, [])[:limit]
raw = self._client.lrange(self._key("room", "decisions", room_id), 0, limit - 1)
return [_json.loads(r) for r in raw]
except Exception as e:
logger.error(f"Failed to list decisions: {e}")
return []
# --- Task Management Methods ---
def save_task(self, room_id: str, task_data: dict) -> bool:
"""Lưu một công việc được giao."""
try:
import json as _json
ts = int(time.time() * 1000)
task_data.setdefault("status", "pending")
task_data["timestamp"] = str(ts)
payload = _json.dumps(task_data, ensure_ascii=False)
if self._use_local:
db = self._load_local()
db.setdefault("tasks", {}).setdefault(room_id, [])
db["tasks"][room_id].insert(0, task_data)
self._save_local(db)
return True
self._client.lpush(self._key("room", "tasks", room_id), payload)
return True
except Exception as e:
logger.error(f"Failed to save task: {e}")
return False
def list_tasks(self, room_id: str, status: str = None, limit: int = 50) -> list[dict]:
"""Liệt kê các công việc trong phòng."""
try:
import json as _json
if self._use_local:
db = self._load_local()
tasks = db.get("tasks", {}).get(room_id, [])
else:
raw = self._client.lrange(self._key("room", "tasks", room_id), 0, limit - 1)
tasks = [_json.loads(r) for r in raw]
if status:
tasks = [t for t in tasks if t.get("status") == status]
return tasks[:limit]
except Exception as e:
logger.error(f"Failed to list tasks: {e}")
return []
def update_task_status(self, room_id: str, task_index: int, new_status: str) -> bool:
"""Cập nhật trạng thái task theo chỉ số."""
try:
import json as _json
key = self._key("room", "tasks", room_id)
if self._use_local:
db = self._load_local()
tasks = db.get("tasks", {}).get(room_id, [])
if 0 <= task_index < len(tasks):
tasks[task_index]["status"] = new_status
self._save_local(db)
return True
return False
raw = self._client.lindex(key, task_index)
if not raw:
return False
task = _json.loads(raw)
task["status"] = new_status
self._client.lset(key, task_index, _json.dumps(task, ensure_ascii=False))
return True
except Exception as e:
logger.error(f"Failed to update task status: {e}")
return False
# --- Reward/Leaderboard Methods ---
def add_reward(self, room_id: str, user_name: str, points: int = 1) -> bool:
"""Cộng điểm cho thành viên."""
try:
if self._use_local:
db = self._load_local()
lb = db.setdefault("leaderboard", {}).setdefault(room_id, {})
lb[user_name] = lb.get(user_name, 0) + points
self._save_local(db)
return True
self._client.zincrby(self._key("room", "leaderboard", room_id), points, user_name)
return True
except Exception as e:
logger.error(f"Failed to add reward: {e}")
return False
def get_leaderboard(self, room_id: str, top_n: int = 10) -> list[dict]:
"""Lấy bảng xếp hạng đóng góp của phòng."""
try:
if self._use_local:
db = self._load_local()
lb = db.get("leaderboard", {}).get(room_id, {})
sorted_lb = sorted(lb.items(), key=lambda x: x[1], reverse=True)[:top_n]
return [{"user": u, "points": p} for u, p in sorted_lb]
results = self._client.zrevrange(
self._key("room", "leaderboard", room_id), 0, top_n - 1, withscores=True
)
return [{"user": u, "points": int(s)} for u, s in results]
except Exception as e:
logger.error(f"Failed to get leaderboard: {e}")
return []
# --- Memory/Knowledge Management Methods ---
def save_memory(self, memory_data: dict) -> bool:
"""Save memory."""
try:
mem_id = memory_data.get("id")
if not mem_id: return False
ts = int(memory_data.get("timestamp") or (time.time() * 1000))
memory_data["timestamp"] = str(ts)
if self._use_local:
db = self._load_local()
db["memories"][mem_id] = memory_data
self._save_local(db)
return True
self._client.hset(self._key("mem", mem_id), mapping=memory_data)
self._client.zadd(self._key("mem", "index"), {mem_id: ts})
return True
except Exception as e:
logger.error(f"Failed to save memory: {e}")
return False
def list_memories(self, query: Optional[str] = None, limit: int = 50) -> list[dict]:
"""List memories."""
try:
if self._use_local:
db = self._load_local()
mems = list(db["memories"].values())
else:
index_key = self._key("mem", "index")
mem_ids = self._client.zrevrange(index_key, 0, limit - 1)
mems = []
for mid in mem_ids:
data = self._client.hgetall(self._key("mem", mid))
if data: mems.append(data)
if query:
q = query.lower()
mems = [m for m in mems if q in m.get("content", "").lower() or q in m.get("category", "").lower()]
return mems[:limit]
except Exception as e:
logger.error(f"Failed to list memories: {e}")
return []
def delete_memory(self, memory_id: str) -> bool:
"""Delete from hash and index."""
try:
self._client.delete(self._key("mem", memory_id))
self._client.zrem(self._key("mem", "index"), memory_id)
return True
except Exception as e:
logger.error(f"Failed to delete memory {memory_id}: {e}")
return False
# Global singleton instance
redis_client = RedisClient()