File size: 22,006 Bytes
2571293
 
 
 
 
 
ca2ba49
 
2571293
 
ca2ba49
2571293
 
7a4edb9
dd47faf
2571293
ca2ba49
 
2571293
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2ba49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2571293
 
 
 
 
 
 
ca2ba49
2571293
 
 
 
 
85ff578
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91fe7ce
 
 
 
2571293
8c43ca8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2571293
 
91fe7ce
2571293
ca2ba49
 
 
 
 
 
 
91fe7ce
 
 
 
 
ca2ba49
91fe7ce
ca2ba49
91fe7ce
 
 
ca2ba49
 
2571293
85ff578
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91fe7ce
85ff578
91fe7ce
 
 
 
85ff578
 
 
 
 
 
 
 
 
 
 
91fe7ce
85ff578
 
 
 
 
 
7a4edb9
 
 
ca2ba49
7a4edb9
 
ca2ba49
7a4edb9
ca2ba49
 
 
 
 
 
 
7a4edb9
 
ca2ba49
 
 
 
7a4edb9
 
ca2ba49
 
7a4edb9
 
ca2ba49
7a4edb9
 
 
ca2ba49
7a4edb9
ca2ba49
 
dd47faf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca2ba49
7a4edb9
 
 
 
 
ca2ba49
7a4edb9
 
ca2ba49
7a4edb9
 
85ff578
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7a4edb9
 
 
ca2ba49
7a4edb9
 
ca2ba49
 
7a4edb9
 
ca2ba49
 
 
 
 
 
7a4edb9
 
 
 
 
 
 
 
 
ca2ba49
7a4edb9
ca2ba49
 
 
 
 
 
 
 
 
 
7a4edb9
ca2ba49
 
 
7a4edb9
ca2ba49
7a4edb9
 
 
 
 
 
 
 
 
 
 
 
 
 
2571293
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
"""
Redis client module — simplified for fetching messages only.

Connects to Redis and retrieves recent messages from a room for summarization.
"""

import os
import sys
import json
import logging
from typing import Optional, Any

import redis
import time
from datetime import datetime

# Thêm path để load config nếu cần
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
from .config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_KEY_PREFIX

logger = logging.getLogger(__name__)


class RedisClient:
    """Singleton Redis client for fetching messages."""
    
    _instance = None
    _client: Optional[redis.Redis] = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    
    def __init__(self):
        if self._client is None:
            self._key_prefix = REDIS_KEY_PREFIX
            self._use_local = False
            self._local_db_path = os.path.join(project_root, "data", "local_db.json")
            
            try:
                self._client = redis.Redis(
                    host=REDIS_HOST,
                    port=REDIS_PORT,
                    db=REDIS_DB,
                    password=REDIS_PASSWORD,
                    decode_responses=True,
                    socket_connect_timeout=2,
                    # Giảm timeout để fallback nhanh hơn
                )
                self._client.ping()
                logger.info(f"Redis client connected to {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}")
            except Exception:
                self._use_local = True
                self._client = None
                logger.warning(f"Redis connection failed. Using local storage: {self._local_db_path}")
                os.makedirs(os.path.dirname(self._local_db_path), exist_ok=True)
                if not os.path.exists(self._local_db_path):
                    with open(self._local_db_path, "w", encoding="utf-8") as f:
                        json.dump({"messages": {}, "events": {}, "memories": {}}, f)

    def _load_local(self) -> dict:
        try:
            with open(self._local_db_path, "r", encoding="utf-8") as f:
                return json.load(f)
        except Exception:
            return {"messages": {}, "events": {}, "memories": {}}

    def _save_local(self, data: dict):
        try:
            with open(self._local_db_path, "w", encoding="utf-8") as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
        except Exception as e:
            logger.error(f"Failed to save local DB: {e}")
    
    def _key(self, *parts: str) -> str:
        """Build a prefixed key."""
        return f"{self._key_prefix}:{':'.join(str(p) for p in parts)}"
    
    def ping(self) -> bool:
        """Test Redis connection."""
        if self._use_local: return True
        try:
            return self._client.ping()
        except Exception as e:
            logger.error(f"Redis ping failed: {e}")
            return False

    def get_active_conversations(self) -> list[str]:
        """Lấy danh sách các ID phòng/DM đang có dữ liệu trong Redis."""
        try:
            if self._use_local:
                db = self._load_local()
                rooms = list(db.get("messages", {}).keys())
                return rooms

            # Quét các key room:messages:* và dm:messages:* hoặc chat:messages:*
            patterns = ["room:messages:*", "dm:messages:*", "chat:messages:*"]
            all_keys = []
            for p in patterns:
                # Thử cả có prefix và không có prefix
                all_keys.extend(self._client.keys(p))
                all_keys.extend(self._client.keys(self._key(p)))
            
            # Loại bỏ duplicates và trích xuất ID (phần cuối cùng của key)
            conversation_ids = set()
            for key in all_keys:
                parts = key.split(":")
                if parts:
                    conversation_ids.add(parts[-1])
            
            return list(conversation_ids)
        except Exception as e:
            logger.error(f"Failed to get active conversations: {e}")
            return []

    def get_chat_history(self, conversation_id: str, limit: int = 100) -> list[dict]:
        """Lấy lịch sử chat của một phòng hoặc DM (tự động nhận diện)."""
        if conversation_id.startswith("room-"):
            room_id = conversation_id[len("room-"):]
            return self.get_room_messages(room_id, limit=limit)
        return self.get_messages_by_conversation_id(conversation_id, limit=limit)
    
    def save_room_messages(self, room_id: str, messages: list[dict]) -> bool:
        """Overwrite messages for a room (used for test seeding)."""
        try:
            if self._use_local:
                db = self._load_local()
                db.setdefault("messages", {})[room_id] = messages
                self._save_local(db)
                return True

            key = self._key("room", "messages", room_id)
            self._client.delete(key)
            for i, msg in enumerate(messages):
                msg_id = msg.get("id") or f"{room_id}_{i}"
                ts = i
                if "timestamp" in msg:
                    try:
                        dt = datetime.fromisoformat(msg["timestamp"].replace("Z", "+00:00"))
                        ts = int(dt.timestamp() * 1000)
                    except Exception:
                        ts = int(time.time() * 1000) + i
                self._client.zadd(key, {msg_id: ts})
                self._client.hset(
                    self._key("msg", msg_id),
                    mapping={k: str(v) for k, v in msg.items()},
                )
            return True
        except Exception as e:
            logger.error(f"Failed to save room messages: {e}")
            return False

    def get_room_messages(self, room_id: str, limit: int = 100) -> list[dict]:
        """
        Fetch recent messages from a room by scanning msg* hashes filtered by room_id field.
        """
        if self._use_local:
            db = self._load_local()
            msgs = db.get("messages", {}).get(room_id, [])
            return msgs[-limit:]

        try:
            messages = []
            for key in self._client.scan_iter("msg*", count=100):
                stored_room_id = self._client.hget(key, "room_id")
                if stored_room_id != room_id:
                    continue
                msg_data = self._client.hgetall(key)
                if msg_data and msg_data.get("deleted") != "true":
                    msg_data["_key"] = key
                    messages.append(msg_data)

            messages.sort(key=lambda x: x.get("created_at") or x.get("timestamp", ""))
            return messages[-limit:]
        except Exception:
            return []

    def get_dm_messages(self, dm_id: str, limit: int = 100) -> list[dict]:
        """
        Fetch recent messages from a DM conversation.
        """
        if self._use_local:
            return []

        # DMs often have different key patterns based on the DB inspection
        key = f"dm:messages:{dm_id}"
        if not self._client.exists(key):
            key = self._key("dm", "messages", dm_id)

        try:
            message_ids = self._client.zrevrange(key, 0, limit - 1)
            messages = []
            for msg_id in message_ids:
                # Based on inspection, DM message hashes are prefixed with 'dmmsg:'
                h_key = f"dmmsg:{msg_id}"
                if not self._client.exists(h_key):
                    h_key = self._key("dmmsg", msg_id)
                
                msg_data = self._client.hgetall(h_key)
                if msg_data:
                    # Map 'sender_id' to 'senderName' if needed by summarizer
                    if "sender_id" in msg_data and "senderName" not in msg_data:
                        msg_data["senderName"] = msg_data["sender_id"]
                    messages.append(msg_data)
            messages.reverse()
            return messages
        except Exception as e:
            logger.error(f"Error fetching DM messages: {e}")
            return []

    def get_messages_by_conversation_id(self, conversation_id: str, limit: int = 100) -> list[dict]:
        """
        Fetch messages that belong to a specific conversation by scanning hashes.
        Useful when sorted set index is missing or inconsistent.
        If conversation_id starts with "room-", delegates to get_room_messages.
        """
        if conversation_id.startswith("room-"):
            room_id = conversation_id[len("room-"):]
            return self.get_room_messages(room_id, limit=limit)

        try:
            keys = self._client.keys("dmmsg:*")
            messages = []
            for k in keys:
                data = self._client.hgetall(k)
                if data.get("conversation_id") == conversation_id:
                    if "sender_id" in data:
                        data["senderName"] = data["sender_id"]
                    if "created_at" in data:
                        data["timestamp"] = data["created_at"]
                    messages.append(data)

            messages.sort(key=lambda x: x.get("timestamp", ""))
            return messages[-limit:]
        except Exception as e:
            logger.error(f"Error fetching messages by conversation_id: {e}")
            return []

    # --- Event Management Methods ---

    def save_event(self, event_data: dict) -> bool:
        """Save an event to Redis or Local JSON."""
        try:
            event_id = event_data.get("id")
            if not event_id: return False
            
            if self._use_local:
                db = self._load_local()
                db["events"][event_id] = event_data
                self._save_local(db)
                return True

            # ISO timestamp -> unix timestamp logic
            ts = int(datetime.now().timestamp() * 1000)
            if "time" in event_data:
                try:
                    dt = datetime.fromisoformat(event_data["time"])
                    ts = int(dt.timestamp() * 1000)
                except ValueError: pass
            
            event_data["timestamp"] = str(ts)
            self._client.hset(self._key("evt", event_id), mapping=event_data)
            self._client.zadd(self._key("evt", "index"), {event_id: ts})
            return True
        except Exception as e:
            logger.error(f"Failed to save event: {e}")
            return False

    def list_events(self, start_ts: int = 0, end_ts: int = 4000000000000) -> list[dict]:
        """List events."""
        try:
            if self._use_local:
                db = self._load_local()
                events = list(db["events"].values())
                filtered = []
                for ev in events:
                    try:
                        # Parse time to check against range
                        dt = datetime.fromisoformat(ev["time"])
                        ts = int(dt.timestamp() * 1000)
                        if start_ts <= ts <= end_ts:
                            filtered.append(ev)
                        else:
                            logger.info(f"Event {ev.get('name')} ({ev.get('time')}) excluded: ts {ts} outside {start_ts}-{end_ts}")
                    except (ValueError, KeyError, TypeError):
                        # Fallback: if no time, only include if full range
                        if start_ts == 0 and end_ts >= 3000000000000:
                            filtered.append(ev)
                return filtered

            index_key = self._key("evt", "index")
            event_ids = self._client.zrangebyscore(index_key, start_ts, end_ts)
            events = []
            for eid in event_ids:
                data = self._client.hgetall(self._key("evt", eid))
                if data: events.append(data)
            return events
        except Exception as e:
            logger.error(f"Failed to list events: {e}")
            return []

    def delete_event(self, event_id: str) -> bool:
        """Xóa sự kiện khỏi Redis và Index."""
        try:
            if self._use_local:
                db = self._load_local()
                if event_id in db["events"]:
                    del db["events"][event_id]
                    self._save_local(db)
                    return True
                return False
            
            self._client.delete(self._key("evt", event_id))
            self._client.zrem(self._key("evt", "index"), event_id)
            return True
        except Exception as e:
            logger.error(f"Failed to delete event {event_id}: {e}")
            return False

    # --- Reminder Methods ---

    def save_reminder(self, reminder_data: dict) -> bool:
        """Lưu lời nhắc."""
        try:
            rem_id = reminder_data.get("id")
            ts = int(reminder_data.get("timestamp") or (time.time() * 1000))
            
            if self._use_local:
                db = self._load_local()
                db["reminders"][rem_id] = reminder_data
                self._save_local(db)
                return True
                
            self._client.hset(self._key("rem", rem_id), mapping=reminder_data)
            self._client.zadd(self._key("rem", "index"), {rem_id: ts})
            return True
        except Exception as e:
            logger.error(f"Failed to save reminder: {e}")
            return False

    def list_reminders(self, limit: int = 50) -> list[dict]:
        """Liệt kê lời nhắc."""
        try:
            if self._use_local:
                db = self._load_local()
                return list(db["reminders"].values())[:limit]
            
            rem_ids = self._client.zrevrange(self._key("rem", "index"), 0, limit - 1)
            reminders = []
            for rid in rem_ids:
                data = self._client.hgetall(self._key("rem", rid))
                if data: reminders.append(data)
            return reminders
        except Exception as e:
            logger.error(f"Failed to list reminders: {e}")
            return []

    # --- Decision Log Methods ---

    def save_decision(self, room_id: str, decision_data: dict) -> bool:
        """Lưu một quyết định vào log của phòng."""
        try:
            import json as _json
            ts = int(time.time() * 1000)
            decision_data["timestamp"] = str(ts)
            payload = _json.dumps(decision_data, ensure_ascii=False)

            if self._use_local:
                db = self._load_local()
                db.setdefault("decisions", {}).setdefault(room_id, [])
                db["decisions"][room_id].insert(0, decision_data)
                self._save_local(db)
                return True

            self._client.lpush(self._key("room", "decisions", room_id), payload)
            return True
        except Exception as e:
            logger.error(f"Failed to save decision: {e}")
            return False

    def list_decisions(self, room_id: str, limit: int = 20) -> list[dict]:
        """Lấy danh sách quyết định gần nhất của phòng."""
        try:
            import json as _json
            if self._use_local:
                db = self._load_local()
                return db.get("decisions", {}).get(room_id, [])[:limit]

            raw = self._client.lrange(self._key("room", "decisions", room_id), 0, limit - 1)
            return [_json.loads(r) for r in raw]
        except Exception as e:
            logger.error(f"Failed to list decisions: {e}")
            return []

    # --- Task Management Methods ---

    def save_task(self, room_id: str, task_data: dict) -> bool:
        """Lưu một công việc được giao."""
        try:
            import json as _json
            ts = int(time.time() * 1000)
            task_data.setdefault("status", "pending")
            task_data["timestamp"] = str(ts)
            payload = _json.dumps(task_data, ensure_ascii=False)

            if self._use_local:
                db = self._load_local()
                db.setdefault("tasks", {}).setdefault(room_id, [])
                db["tasks"][room_id].insert(0, task_data)
                self._save_local(db)
                return True

            self._client.lpush(self._key("room", "tasks", room_id), payload)
            return True
        except Exception as e:
            logger.error(f"Failed to save task: {e}")
            return False

    def list_tasks(self, room_id: str, status: str = None, limit: int = 50) -> list[dict]:
        """Liệt kê các công việc trong phòng."""
        try:
            import json as _json
            if self._use_local:
                db = self._load_local()
                tasks = db.get("tasks", {}).get(room_id, [])
            else:
                raw = self._client.lrange(self._key("room", "tasks", room_id), 0, limit - 1)
                tasks = [_json.loads(r) for r in raw]

            if status:
                tasks = [t for t in tasks if t.get("status") == status]
            return tasks[:limit]
        except Exception as e:
            logger.error(f"Failed to list tasks: {e}")
            return []

    def update_task_status(self, room_id: str, task_index: int, new_status: str) -> bool:
        """Cập nhật trạng thái task theo chỉ số."""
        try:
            import json as _json
            key = self._key("room", "tasks", room_id)

            if self._use_local:
                db = self._load_local()
                tasks = db.get("tasks", {}).get(room_id, [])
                if 0 <= task_index < len(tasks):
                    tasks[task_index]["status"] = new_status
                    self._save_local(db)
                    return True
                return False

            raw = self._client.lindex(key, task_index)
            if not raw:
                return False
            task = _json.loads(raw)
            task["status"] = new_status
            self._client.lset(key, task_index, _json.dumps(task, ensure_ascii=False))
            return True
        except Exception as e:
            logger.error(f"Failed to update task status: {e}")
            return False

    # --- Reward/Leaderboard Methods ---

    def add_reward(self, room_id: str, user_name: str, points: int = 1) -> bool:
        """Cộng điểm cho thành viên."""
        try:
            if self._use_local:
                db = self._load_local()
                lb = db.setdefault("leaderboard", {}).setdefault(room_id, {})
                lb[user_name] = lb.get(user_name, 0) + points
                self._save_local(db)
                return True

            self._client.zincrby(self._key("room", "leaderboard", room_id), points, user_name)
            return True
        except Exception as e:
            logger.error(f"Failed to add reward: {e}")
            return False

    def get_leaderboard(self, room_id: str, top_n: int = 10) -> list[dict]:
        """Lấy bảng xếp hạng đóng góp của phòng."""
        try:
            if self._use_local:
                db = self._load_local()
                lb = db.get("leaderboard", {}).get(room_id, {})
                sorted_lb = sorted(lb.items(), key=lambda x: x[1], reverse=True)[:top_n]
                return [{"user": u, "points": p} for u, p in sorted_lb]

            results = self._client.zrevrange(
                self._key("room", "leaderboard", room_id), 0, top_n - 1, withscores=True
            )
            return [{"user": u, "points": int(s)} for u, s in results]
        except Exception as e:
            logger.error(f"Failed to get leaderboard: {e}")
            return []

    # --- Memory/Knowledge Management Methods ---

    def save_memory(self, memory_data: dict) -> bool:
        """Save memory."""
        try:
            mem_id = memory_data.get("id")
            if not mem_id: return False
            
            ts = int(memory_data.get("timestamp") or (time.time() * 1000))
            memory_data["timestamp"] = str(ts)

            if self._use_local:
                db = self._load_local()
                db["memories"][mem_id] = memory_data
                self._save_local(db)
                return True
            
            self._client.hset(self._key("mem", mem_id), mapping=memory_data)
            self._client.zadd(self._key("mem", "index"), {mem_id: ts})
            return True
        except Exception as e:
            logger.error(f"Failed to save memory: {e}")
            return False

    def list_memories(self, query: Optional[str] = None, limit: int = 50) -> list[dict]:
        """List memories."""
        try:
            if self._use_local:
                db = self._load_local()
                mems = list(db["memories"].values())
            else:
                index_key = self._key("mem", "index")
                mem_ids = self._client.zrevrange(index_key, 0, limit - 1)
                mems = []
                for mid in mem_ids:
                    data = self._client.hgetall(self._key("mem", mid))
                    if data: mems.append(data)
            
            if query:
                q = query.lower()
                mems = [m for m in mems if q in m.get("content", "").lower() or q in m.get("category", "").lower()]
            
            return mems[:limit]
        except Exception as e:
            logger.error(f"Failed to list memories: {e}")
            return []

    def delete_memory(self, memory_id: str) -> bool:
        """Delete from hash and index."""
        try:
            self._client.delete(self._key("mem", memory_id))
            self._client.zrem(self._key("mem", "index"), memory_id)
            return True
        except Exception as e:
            logger.error(f"Failed to delete memory {memory_id}: {e}")
            return False


# Global singleton instance
redis_client = RedisClient()