quachtiensinh27 commited on
Commit
2571293
·
1 Parent(s): d2e0613

feat: add Redis message fetching and AI summarization pipeline

Browse files
Files changed (2) hide show
  1. redis_client.py +96 -0
  2. tools.py +34 -231
redis_client.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Redis client module — simplified for fetching messages only.
3
+
4
+ Connects to Redis and retrieves recent messages from a room for summarization.
5
+ """
6
+
7
+ import json
8
+ import logging
9
+ from typing import Optional
10
+
11
+ import redis
12
+
13
+ from .config import REDIS_HOST, REDIS_PORT, REDIS_DB, REDIS_PASSWORD, REDIS_KEY_PREFIX
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ class RedisClient:
19
+ """Singleton Redis client for fetching messages."""
20
+
21
+ _instance = None
22
+ _client: Optional[redis.Redis] = None
23
+
24
+ def __new__(cls):
25
+ if cls._instance is None:
26
+ cls._instance = super().__new__(cls)
27
+ return cls._instance
28
+
29
+ def __init__(self):
30
+ if self._client is None:
31
+ self._client = redis.Redis(
32
+ host=REDIS_HOST,
33
+ port=REDIS_PORT,
34
+ db=REDIS_DB,
35
+ password=REDIS_PASSWORD,
36
+ decode_responses=True,
37
+ socket_connect_timeout=5,
38
+ socket_timeout=5,
39
+ )
40
+ self._key_prefix = REDIS_KEY_PREFIX
41
+ logger.info(f"Redis client connected to {REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}")
42
+
43
+ def _key(self, *parts: str) -> str:
44
+ """Build a prefixed key."""
45
+ return f"{self._key_prefix}:{':'.join(str(p) for p in parts)}"
46
+
47
+ def ping(self) -> bool:
48
+ """Test Redis connection."""
49
+ try:
50
+ return self._client.ping()
51
+ except Exception as e:
52
+ logger.error(f"Redis ping failed: {e}")
53
+ return False
54
+
55
+ def get_room_messages(self, room_id: str, limit: int = 100) -> list[dict]:
56
+ """
57
+ Fetch recent messages from a room, ordered by timestamp (newest first).
58
+
59
+ Args:
60
+ room_id: Room ID (e.g., "toan-cao-cap")
61
+ limit: Number of messages to fetch (default: 100)
62
+
63
+ Returns:
64
+ List of message dicts with fields:
65
+ - message_id: str
66
+ - thread_id: str (same as room_id)
67
+ - user_name: str
68
+ - content: str
69
+ - timestamp: str
70
+ """
71
+ key = self._key("room", "messages", room_id)
72
+
73
+ # Get message IDs from sorted set (newest first)
74
+ message_ids = self._client.zrevrange(key, 0, limit - 1)
75
+
76
+ messages = []
77
+ for msg_id in message_ids:
78
+ msg_data = self._client.hgetall(self._key("msg", msg_id))
79
+ if msg_data and msg_data.get("deleted") != "true":
80
+ messages.append({
81
+ "message_id": msg_data.get("id", ""),
82
+ "thread_id": msg_data.get("roomId", room_id),
83
+ "user_name": msg_data.get("senderName", "Unknown"),
84
+ "content": msg_data.get("content", ""),
85
+ "timestamp": msg_data.get("timestamp", ""),
86
+ })
87
+
88
+ # Reverse to get chronological order (oldest first)
89
+ messages.reverse()
90
+
91
+ logger.info(f"Fetched {len(messages)} messages from Redis room: {room_id}")
92
+ return messages
93
+
94
+
95
+ # Global singleton instance
96
+ redis_client = RedisClient()
tools.py CHANGED
@@ -8,11 +8,9 @@ Team: Khôi (LangGraph Agent), Hoàng (Database)
8
 
9
  import json
10
  import time
11
- import sqlite3
12
  import logging
13
  from collections import defaultdict
14
  from typing import Any
15
- from pathlib import Path
16
 
17
  import httpx
18
  from langchain_openai import ChatOpenAI
@@ -20,7 +18,8 @@ from langchain_core.output_parsers import JsonOutputParser
20
  from langchain_core.prompts import ChatPromptTemplate
21
  from pydantic import BaseModel, Field
22
 
23
- from .config import QWEN_API_KEY, QWEN_BASE_URL, QWEN_MODEL, LOG_LEVEL, DB_PATH, DB_TYPE, CHROMA_DB_PATH, CHROMA_COLLECTION
 
24
 
25
  logger = logging.getLogger(__name__)
26
 
@@ -201,212 +200,58 @@ def _extract_token_usage(response_metadata: dict) -> dict:
201
  return default_usage
202
 
203
 
204
- def fetch_messages_from_db(db_path: str = None, limit: int = 100) -> list[dict]:
205
- """
206
- Tự động lấy 100 tin nhắn gần nhất từ SQLite database.
207
-
208
- Args:
209
- db_path: Đường dẫn đến file SQLite. Nếu None, dùng DB_PATH từ config.
210
- limit: Số lượng tin nhắn tối đa cần lấy (default: 100).
211
-
212
- Returns:
213
- List of message dicts với 5 fields: message_id, thread_id, user_name, content, timestamp
214
- """
215
- if db_path is None:
216
- db_path = DB_PATH
217
-
218
- if not Path(db_path).exists():
219
- raise FileNotFoundError(f"Database file not found: {db_path}")
220
-
221
- try:
222
- conn = sqlite3.connect(db_path)
223
- conn.row_factory = sqlite3.Row
224
- cursor = conn.cursor()
225
-
226
- # Lấy tin nhắn gần nhất, sort theo timestamp DESC
227
- query = """
228
- SELECT
229
- id as message_id,
230
- thread_id,
231
- user_name,
232
- content,
233
- timestamp
234
- FROM messages
235
- ORDER BY timestamp DESC
236
- LIMIT ?
237
- """
238
- cursor.execute(query, (limit,))
239
- rows = cursor.fetchall()
240
-
241
- # Convert to list of dicts
242
- messages = [
243
- {
244
- "message_id": str(row["message_id"]),
245
- "thread_id": row["thread_id"] or "unknown",
246
- "user_name": row["user_name"] or "Unknown",
247
- "content": row["content"] or "",
248
- "timestamp": row["timestamp"] or ""
249
- }
250
- for row in rows
251
- ]
252
-
253
- # Reverse để sort theo timestamp ASC (thứ tự thời gian đúng)
254
- messages.reverse()
255
-
256
- logger.info(f"Fetched {len(messages)} messages from database: {db_path}")
257
- return messages
258
-
259
- except sqlite3.Error as e:
260
- raise ConnectionError(f"Database error: {e}")
261
- finally:
262
- if conn:
263
- conn.close()
264
-
265
-
266
- def fetch_messages_from_chromadb(
267
- chroma_path: str = None,
268
- collection_name: str = None,
269
- limit: int = 100
270
- ) -> list[dict]:
271
- """
272
- Tự động lấy 100 tin nhắn gần nhất từ ChromaDB.
273
-
274
- Args:
275
- chroma_path: Đường dẫn đến ChromaDB folder. Nếu None, dùng config.
276
- collection_name: Tên collection. Nếu None, dùng config.
277
- limit: Số lượng tin nhắn tối đa (default: 100).
278
-
279
- Returns:
280
- List of message dicts với 5 fields: message_id, thread_id, user_name, content, timestamp
281
- """
282
- import chromadb
283
- from chromadb.config import Settings
284
-
285
- if chroma_path is None:
286
- chroma_path = CHROMA_DB_PATH
287
- if collection_name is None:
288
- collection_name = CHROMA_COLLECTION
289
-
290
- try:
291
- # Connect to persistent ChromaDB
292
- client = chromadb.PersistentClient(path=chroma_path)
293
- collection = client.get_collection(name=collection_name)
294
-
295
- # Get all documents with metadata (limit to recent ones)
296
- # ChromaDB doesn't have timestamp ordering by default, so we get all and sort
297
- results = collection.get(
298
- include=["documents", "metadatas"],
299
- limit=limit * 2 # Get extra in case we need to filter
300
- )
301
-
302
- messages = []
303
- for i, (doc, metadata) in enumerate(zip(results["documents"], results["metadatas"])):
304
- messages.append({
305
- "message_id": metadata.get("message_id", str(i)),
306
- "thread_id": metadata.get("thread_id", "unknown"),
307
- "user_name": metadata.get("user_name", "Unknown"),
308
- "content": doc, # Document is the text content
309
- "timestamp": metadata.get("timestamp", "")
310
- })
311
-
312
- # Sort by timestamp DESC and take top N
313
- messages.sort(key=lambda m: m.get("timestamp", ""), reverse=True)
314
- messages = messages[:limit]
315
-
316
- # Reverse to get ASC order (chronological)
317
- messages.reverse()
318
-
319
- logger.info(f"Fetched {len(messages)} messages from ChromaDB: {chroma_path}")
320
- return messages
321
-
322
- except Exception as e:
323
- raise ConnectionError(f"ChromaDB error: {e}")
324
-
325
-
326
- def fetch_messages(db_type: str = None, limit: int = 100, **kwargs) -> list[dict]:
327
- """
328
- Universal message fetcher — supports both SQLite and ChromaDB.
329
-
330
- Args:
331
- db_type: "sqlite" or "chromadb". Nếu None, dùng config.
332
- limit: Số tin nhắn cần lấy.
333
- **kwargs: Additional params (db_path, chroma_path, collection_name).
334
-
335
- Returns:
336
- List of message dicts.
337
- """
338
- if db_type is None:
339
- db_type = DB_TYPE
340
-
341
- if db_type == "chromadb":
342
- return fetch_messages_from_chromadb(
343
- chroma_path=kwargs.get("chroma_path"),
344
- collection_name=kwargs.get("collection_name"),
345
- limit=limit
346
- )
347
- else:
348
- return fetch_messages_from_db(
349
- db_path=kwargs.get("db_path"),
350
- limit=limit
351
- )
352
-
353
-
354
  def tool_summarize_chat(
355
  messages: list[dict] = None,
356
  limit: int = 100,
357
- db_path: str = None,
358
- db_type: str = None,
359
- chroma_path: str = None,
360
- collection_name: str = None
361
  ) -> dict:
362
  """
363
- Hàm chính xử lệnh /tldr:
364
- 1. Nếu không truyền messages → tự động lấy từ DB (SQLite hoặc ChromaDB)
365
- 2. Tiền xử lý: gom nhóm theo thread_id
 
 
366
  3. Gửi vào Qwen API với prompt chống bịa đặt
367
- 4. Trả về JSON tóm tắt theo hợp đồng dữ liệu của nhóm
368
 
369
  Args:
370
- messages: (Optional) List of message dicts. Nếu None → tự lấy từ DB.
371
- limit: (Optional) Số tin nhắn cần lấy từ DB (default: 100).
372
- db_path: (Optional) Đường dẫn đến file SQLite.
373
- db_type: (Optional) "sqlite" hoặc "chromadb". Nếu None → dùng config.
374
- chroma_path: (Optional) Đường dẫn đến ChromaDB folder.
375
- collection_name: (Optional) Tên collection trong ChromaDB.
376
 
377
  Returns:
378
  {
379
  "status": "success" | "error",
380
- "data": <JSON_Object_from_LLM>,
381
- "metrics": {
382
- "processing_time_sec": <float>,
383
- "token_usage": {"input_tokens": int, "output_tokens": int, "total_tokens": int}
384
- }
385
  }
386
 
387
  Examples:
388
- # Tự động lấy 100 tin từ DB
389
- result = tool_summarize_chat()
390
 
391
- # Tự lấy 50 tin từ DB custom
392
- result = tool_summarize_chat(limit=50, db_path="data/my_db.sqlite")
393
 
394
- # Truyền messages trực tiếp (tương thích ngược)
395
  result = tool_summarize_chat(messages=my_messages)
396
  """
397
  start_time = time.time()
398
 
399
  try:
400
- # Auto-fetch from DB if messages not provided
401
  if messages is None:
402
- logger.info(f"No messages provided, fetching from {db_type or DB_TYPE} database...")
403
- messages = fetch_messages(
404
- db_type=db_type,
405
- limit=limit,
406
- db_path=db_path,
407
- chroma_path=chroma_path,
408
- collection_name=collection_name
409
- )
 
 
 
410
 
411
  # Validate input
412
  if not messages:
@@ -510,54 +355,12 @@ Trả về JSON tóm tắt cho từng thread. Tuân thủ nghiêm ngặt các qu
510
  }
511
 
512
 
513
- # =============================================================================
514
- # SECTION E: BACKWARD COMPATIBILITY — Legacy tools
515
- # =============================================================================
516
-
517
- def search_web(query: str) -> str:
518
- """Search for information on the web (placeholder)."""
519
- return f"Search results for: {query}"
520
-
521
-
522
- def calculate(expression: str) -> str:
523
- """Evaluate a math expression."""
524
- try:
525
- result = eval(expression, {"__builtins__": {}})
526
- return str(result)
527
- except Exception as e:
528
- return f"Error: {e}"
529
-
530
-
531
- def fetch_url(url: str) -> str:
532
- """Fetch content from a URL."""
533
- try:
534
- resp = httpx.get(url, timeout=10, follow_redirects=True)
535
- return resp.text[:2000]
536
- except Exception as e:
537
- return f"Error: {e}"
538
-
539
-
540
- # Tool registry — includes both legacy tools and new /tldr pipeline
541
  TOOLS = {
542
- "search_web": {
543
- "fn": search_web,
544
- "description": "Search for information on the web",
545
- "parameters": {"query": "string"},
546
- },
547
- "calculate": {
548
- "fn": calculate,
549
- "description": "Evaluate a math expression",
550
- "parameters": {"expression": "string"},
551
- },
552
- "fetch_url": {
553
- "fn": fetch_url,
554
- "description": "Fetch content from a URL",
555
- "parameters": {"url": "string"},
556
- },
557
  "summarize_chat": {
558
  "fn": tool_summarize_chat,
559
- "description": "Summarize group chat messages by thread using AI. Use this for /tldr command.",
560
- "parameters": {"messages": "array"},
561
  },
562
  }
563
 
 
8
 
9
  import json
10
  import time
 
11
  import logging
12
  from collections import defaultdict
13
  from typing import Any
 
14
 
15
  import httpx
16
  from langchain_openai import ChatOpenAI
 
18
  from langchain_core.prompts import ChatPromptTemplate
19
  from pydantic import BaseModel, Field
20
 
21
+ from .config import QWEN_API_KEY, QWEN_BASE_URL, QWEN_MODEL, LOG_LEVEL
22
+ from .redis_client import redis_client
23
 
24
  logger = logging.getLogger(__name__)
25
 
 
200
  return default_usage
201
 
202
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  def tool_summarize_chat(
204
  messages: list[dict] = None,
205
  limit: int = 100,
206
+ room_id: str = None,
 
 
 
207
  ) -> dict:
208
  """
209
+ Tóm tắt tin nhắn chat từ Redis theo từng thread.
210
+
211
+ Luồng hoạt động:
212
+ 1. Nếu không truyền messages → lấy từ Redis (cần room_id)
213
+ 2. Gom nhóm tin nhắn theo thread_id
214
  3. Gửi vào Qwen API với prompt chống bịa đặt
215
+ 4. Trả về JSON tóm tắt
216
 
217
  Args:
218
+ messages: (Optional) List of message dicts. Nếu None → lấy từ Redis.
219
+ limit: (Optional) Số tin nhắn tối đa (default: 100).
220
+ room_id: (Optional) Room ID trong Redis. Bắt buộc nếu không truyền messages.
 
 
 
221
 
222
  Returns:
223
  {
224
  "status": "success" | "error",
225
+ "data": {"summary": [{"thread_id", "main_discussion", "status", "conclusion"}]},
226
+ "metrics": {"processing_time_sec": float, "token_usage": {...}}
 
 
 
227
  }
228
 
229
  Examples:
230
+ # Lấy 100 tin từ Redis và tóm tắt
231
+ result = tool_summarize_chat(room_id="toan-cao-cap")
232
 
233
+ # Lấy 50 tin
234
+ result = tool_summarize_chat(room_id="toan-cao-cap", limit=50)
235
 
236
+ # Truyền messages trực tiếp
237
  result = tool_summarize_chat(messages=my_messages)
238
  """
239
  start_time = time.time()
240
 
241
  try:
242
+ # Auto-fetch from Redis if messages not provided
243
  if messages is None:
244
+ if not room_id:
245
+ return {
246
+ "status": "error",
247
+ "data": {"error": "room_id is required when messages is not provided"},
248
+ "metrics": {
249
+ "processing_time_sec": 0.0,
250
+ "token_usage": {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
251
+ }
252
+ }
253
+ logger.info(f"Fetching {limit} messages from Redis room: {room_id}...")
254
+ messages = redis_client.get_room_messages(room_id, limit)
255
 
256
  # Validate input
257
  if not messages:
 
355
  }
356
 
357
 
358
+ # Tool registry
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
359
  TOOLS = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
  "summarize_chat": {
361
  "fn": tool_summarize_chat,
362
+ "description": "Summarize group chat messages by thread using AI. Fetches from Redis if room_id is provided.",
363
+ "parameters": {"room_id": "string", "limit": "integer"},
364
  },
365
  }
366