092_agent_api / tools /summarizer.py
quachtiensinh27
feat: initialize core agent infrastructure including Redis client, tool directory, scheduling logic, and documentation.
85ff578
"""
Summarization tool using LangChain + LLM.
"""
import time
import logging
from typing import Any, List
from pydantic import BaseModel, Field
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from .base import register_tool, get_llm
from .utils import preprocess_messages, extract_metadata_from_messages
try:
from ..redis_client import redis_client
except (ImportError, ValueError):
from redis_client import redis_client
logger = logging.getLogger(__name__)
# --- Pydantic Schemas ---
class ThreadSummary(BaseModel):
"""Schema cho tóm tắt của một thread."""
thread_id: str = Field(description="ID hoặc tên chủ đề của thread")
main_discussion: str = Field(description="Nội dung chính đang thảo luận")
conversation_flow: str = Field(description="Tóm tắt diễn biến cuộc hội thoại (ai hỏi, ai trả lời, mâu thuẫn/đồng thuận)")
status: str = Field(description="Trạng thái: 'Đã chốt' hoặc 'Chưa chốt'")
conclusion: str = Field(description="Kết luận cuối cùng")
class DecisionObject(BaseModel):
"""Schema cho một quyết định cụ thể."""
content: str = Field(description="Nội dung quyết định vừa chốt")
confirmed_by: list[str] = Field(description="Tên những người đã thống nhất/đồng thuận")
class TaskObject(BaseModel):
"""Schema cho một công việc được giao."""
task: str = Field(description="Nội dung công việc")
assignee: str = Field(description="Người thực hiện (nếu có)")
deadline: str = Field(description="Hạn chót (nếu có đề cập)")
class TLDRResponse(BaseModel):
"""Schema cho response JSON tổng thể."""
summary: list[ThreadSummary] = Field(description="Danh sách tóm tắt các thread")
links_found: list[str] = Field(description="Danh sách các liên kết (URL) được tìm thấy")
files_found: list[dict] = Field(description="Danh sách các tệp tin/ảnh được tìm thấy")
decisions: list[DecisionObject] = Field(default=[], description="Danh sách các quyết định quan trọng vừa chốt (Decision Log)")
assigned_tasks: list[TaskObject] = Field(default=[], description="Danh sách các công việc vừa được giao (Task List)")
# --- System Prompt ---
SYSTEM_PROMPT_TLDR = """
Bạn là một THƯ KÝ DỰ ÁN chuyên nghiệp.
Nhiệm vụ: tóm tắt các đoạn chat nhóm thành báo cáo chính xác tuyệt đối.
- **ƯU TIÊN**: Nếu người dùng có yêu cầu cụ thể (query), bạn phải tập trung giải quyết yêu cầu đó trong phần đầu của `summary`.
- Xác định rõ 'conversation_flow': mô tả cách cuộc thảo luận diễn ra (ví dụ: A đề xuất, B phản đối, cuối cùng cả nhóm đồng ý).
- **QUAN TRỌNG**: Nếu thấy các câu lệnh mang tính chốt hạ như "ok", "duyệt", "thống nhất vậy đi", "chốt cái này"... hãy đưa vào mục `decisions`.
- **QUAN TRỌNG**: Nếu thấy có sự phân công công việc (Ví dụ: "Hùng làm slide nhé", "A check lại giá") -> đưa vào `assigned_tasks`.
- Trả về JSON đúng schema.
"""
# ... (execute_tool part)
@register_tool(
name="summarize_chat",
description="Tóm tắt tin nhắn, trích xuất Quyết định/Công việc và Link/File bằng AI. Có thể tự động lưu lại vào nhật ký nhóm.",
parameters=[
{"name": "room_id", "type": "string", "description": "ID của phòng chat nhóm.", "required": False},
{"name": "dm_id", "type": "string", "description": "ID của cuộc hội thoại cá nhân (DM) theo Sorted Set.", "required": False},
{"name": "conversation_id", "type": "string", "description": "ID của hội thoại (conversation_id) trong dmmsg.", "required": False},
{"name": "query", "type": "string", "description": "Yêu cầu tóm tắt cụ thể (ví dụ: 'Tập trung vào phần deadline').", "required": False},
{"name": "limit", "type": "integer", "description": "Số lượng tin nhắn tối đa (Mặc định: 100).", "required": False},
{"name": "auto_save", "type": "boolean", "description": "Nếu True, sẽ tự động lưu Quyết định và Task vào Database của nhóm.", "required": False}
]
)
def tool_summarize_chat(
messages: List[dict] = None,
limit: int = 100,
room_id: str = None,
dm_id: str = None,
conversation_id: str = None,
query: str = None,
auto_save: bool = False
) -> dict:
start_time = time.time()
try:
if messages is None:
if conversation_id:
messages = redis_client.get_messages_by_conversation_id(conversation_id, limit)
elif dm_id:
messages = redis_client.get_dm_messages(dm_id, limit)
elif room_id:
messages = redis_client.get_room_messages(room_id, limit)
else:
raise ValueError("Either room_id, dm_id or conversation_id is required when messages is not provided")
if not messages:
return {
"status": "success",
"data": {
"summary": [],
"links_found": [],
"files_found": [],
"decisions": [],
"assigned_tasks": []
},
"metrics": {"processing_time_sec": 0}
}
# 1. Trích xuất Metadata (Links/Files)
metadata = extract_metadata_from_messages(messages)
# 2. Tiền xử lý văn bản cho LLM
formatted_threads = preprocess_messages(messages)
llm = get_llm()
parser = JsonOutputParser(pydantic_object=TLDRResponse)
prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT_TLDR),
("human", "Dựa trên các tin nhắn sau, hãy thực hiện yêu cầu này: {query}\n\nNỘI DUNG TIN NHẮN:\n{chat_threads}\n\n{format_instructions}"),
])
chain = prompt | llm | parser
result = chain.invoke({
"chat_threads": formatted_threads,
"query": query or "Tóm tắt tổng quát các nội dung quan trọng.",
"format_instructions": parser.get_format_instructions(),
})
# Merge metadata vào kết quả
result["links_found"] = list(set(result.get("links_found", []) + metadata["links"]))
result["files_found"] = metadata["files"]
# 3. Logic Tự động lưu (Auto-save) và Thưởng điểm (Rewards)
if auto_save and room_id:
for dec in result.get("decisions", []):
redis_client.save_decision(room_id, dec)
# Thưởng điểm cho những người tham gia chốt
for user in dec.get("confirmed_by", []):
redis_client.add_reward(room_id, user, points=2)
for task in result.get("assigned_tasks", []):
redis_client.save_task(room_id, task)
# Thưởng điểm cho người nhận task
if task.get("assignee"):
redis_client.add_reward(room_id, task["assignee"], points=3)
processing_time = round(time.time() - start_time, 2)
return {
"status": "success",
"data": result,
"metrics": {"processing_time_sec": processing_time}
}
except Exception as e:
logger.error(f"Summarizer error: {e}")
return {"status": "error", "data": {"error": str(e)}, "metrics": {"processing_time_sec": 0}}