fix racing issues when sending message
Browse files- app/chat_channel.py +8 -4
- app/main.py +4 -5
- app/message_processor.py +22 -6
- app/reranker.py +3 -14
app/chat_channel.py
CHANGED
|
@@ -25,10 +25,9 @@ class ChatChannel:
|
|
| 25 |
base_url=settings.gemini_base_url,
|
| 26 |
model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
|
| 27 |
)
|
| 28 |
-
self.
|
| 29 |
-
self.reranker = Reranker(facebook_client=self.facebook)
|
| 30 |
self.embedder = EmbeddingClient()
|
| 31 |
-
self.
|
| 32 |
|
| 33 |
def get_page_token(self, force_refresh=False):
|
| 34 |
if self.page_token is None or force_refresh:
|
|
@@ -36,4 +35,9 @@ class ChatChannel:
|
|
| 36 |
return self.page_token
|
| 37 |
|
| 38 |
def invalidate_page_token(self):
|
| 39 |
-
self.page_token = None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 25 |
base_url=settings.gemini_base_url,
|
| 26 |
model=settings.gemini_models_list[0] if settings.gemini_models_list else "gemini-2.5-flash"
|
| 27 |
)
|
| 28 |
+
self.reranker = Reranker()
|
|
|
|
| 29 |
self.embedder = EmbeddingClient()
|
| 30 |
+
self.conversations = {} # sender_id -> MessageProcessor
|
| 31 |
|
| 32 |
def get_page_token(self, force_refresh=False):
|
| 33 |
if self.page_token is None or force_refresh:
|
|
|
|
| 35 |
return self.page_token
|
| 36 |
|
| 37 |
def invalidate_page_token(self):
|
| 38 |
+
self.page_token = None
|
| 39 |
+
|
| 40 |
+
def get_or_create_conversation(self, sender_id: str):
|
| 41 |
+
if sender_id not in self.conversations:
|
| 42 |
+
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
| 43 |
+
return self.conversations[sender_id]
|
app/main.py
CHANGED
|
@@ -181,13 +181,12 @@ async def webhook(request: Request):
|
|
| 181 |
if not message_data:
|
| 182 |
return {"status": "ok"}
|
| 183 |
|
| 184 |
-
# --- Refactor: Lấy page_id,
|
| 185 |
page_id = message_data.get("page_id")
|
| 186 |
-
|
| 187 |
-
# Lấy hoặc tạo ChatChannel
|
| 188 |
channel = channel_manager.get_or_create_channel("facebook", page_id)
|
| 189 |
-
|
| 190 |
-
await
|
| 191 |
return {"status": "ok"}
|
| 192 |
except Exception as e:
|
| 193 |
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
|
|
|
|
| 181 |
if not message_data:
|
| 182 |
return {"status": "ok"}
|
| 183 |
|
| 184 |
+
# --- Refactor: Lấy page_id, sender_id, channel, conversation, gọi message_processor ---
|
| 185 |
page_id = message_data.get("page_id")
|
| 186 |
+
sender_id = message_data.get("sender_id")
|
|
|
|
| 187 |
channel = channel_manager.get_or_create_channel("facebook", page_id)
|
| 188 |
+
conversation = channel.get_or_create_conversation(sender_id)
|
| 189 |
+
await conversation.process_message(message_data)
|
| 190 |
return {"status": "ok"}
|
| 191 |
except Exception as e:
|
| 192 |
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
|
app/message_processor.py
CHANGED
|
@@ -2,12 +2,21 @@ from typing import Dict, Any, List
|
|
| 2 |
import asyncio
|
| 3 |
import traceback
|
| 4 |
from loguru import logger
|
| 5 |
-
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES
|
| 6 |
from .utils import get_random_message
|
|
|
|
| 7 |
|
| 8 |
class MessageProcessor:
|
| 9 |
-
def __init__(self, channel):
|
| 10 |
self.channel = channel
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 11 |
|
| 12 |
async def process_message(self, message_data: Dict[str, Any]):
|
| 13 |
# Refactor logic từ main.py vào đây
|
|
@@ -120,7 +129,7 @@ class MessageProcessor:
|
|
| 120 |
return
|
| 121 |
# Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
|
| 122 |
try:
|
| 123 |
-
await self.
|
| 124 |
except Exception as e:
|
| 125 |
if "expired" in str(e).lower():
|
| 126 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
|
@@ -181,7 +190,7 @@ class MessageProcessor:
|
|
| 181 |
# 6. Gửi response và cập nhật final state
|
| 182 |
# Replace all occurrences of '**' with '*' before sending
|
| 183 |
response_to_send = response.replace('**', '*') if isinstance(response, str) else response
|
| 184 |
-
await self.
|
| 185 |
if hasattr(self.channel, 'sheets'):
|
| 186 |
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
| 187 |
return
|
|
@@ -256,10 +265,17 @@ class MessageProcessor:
|
|
| 256 |
async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
|
| 257 |
if not matches:
|
| 258 |
return "Không tìm thấy kết quả phù hợp."
|
| 259 |
-
await self.
|
| 260 |
try:
|
| 261 |
reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
|
| 262 |
if reranked:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 263 |
matches = reranked
|
| 264 |
except Exception as e:
|
| 265 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
|
@@ -328,7 +344,7 @@ class MessageProcessor:
|
|
| 328 |
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
| 329 |
f"\n\nCâu hỏi của người dùng: {question}\n"
|
| 330 |
)
|
| 331 |
-
await self.
|
| 332 |
try:
|
| 333 |
answer = await self.channel.llm.generate_text(prompt)
|
| 334 |
if answer and answer.strip():
|
|
|
|
| 2 |
import asyncio
|
| 3 |
import traceback
|
| 4 |
from loguru import logger
|
| 5 |
+
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
|
| 6 |
from .utils import get_random_message
|
| 7 |
+
from .facebook import FacebookClient
|
| 8 |
|
| 9 |
class MessageProcessor:
|
| 10 |
+
def __init__(self, channel, sender_id):
|
| 11 |
self.channel = channel
|
| 12 |
+
self.sender_id = sender_id
|
| 13 |
+
# FacebookClient riêng cho từng conversation
|
| 14 |
+
self.facebook = FacebookClient(
|
| 15 |
+
app_secret=channel.llm.gemini_client.limit_manager.get_settings().facebook_app_secret,
|
| 16 |
+
page_id=channel.page_id,
|
| 17 |
+
page_token=channel.get_page_token(),
|
| 18 |
+
sender_id=sender_id
|
| 19 |
+
)
|
| 20 |
|
| 21 |
async def process_message(self, message_data: Dict[str, Any]):
|
| 22 |
# Refactor logic từ main.py vào đây
|
|
|
|
| 129 |
return
|
| 130 |
# Gửi message Facebook, nếu lỗi token expired thì invalidate và thử lại một lần
|
| 131 |
try:
|
| 132 |
+
await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES))
|
| 133 |
except Exception as e:
|
| 134 |
if "expired" in str(e).lower():
|
| 135 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
|
|
|
| 190 |
# 6. Gửi response và cập nhật final state
|
| 191 |
# Replace all occurrences of '**' with '*' before sending
|
| 192 |
response_to_send = response.replace('**', '*') if isinstance(response, str) else response
|
| 193 |
+
await self.facebook.send_message(message=response_to_send)
|
| 194 |
if hasattr(self.channel, 'sheets'):
|
| 195 |
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
| 196 |
return
|
|
|
|
| 265 |
async def format_search_results(self, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
|
| 266 |
if not matches:
|
| 267 |
return "Không tìm thấy kết quả phù hợp."
|
| 268 |
+
await self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES))
|
| 269 |
try:
|
| 270 |
reranked = await self.channel.reranker.rerank(question, matches, top_k=5)
|
| 271 |
if reranked:
|
| 272 |
+
# Gửi Facebook message sau khi hoàn thành
|
| 273 |
+
if self.facebook:
|
| 274 |
+
try:
|
| 275 |
+
message = get_random_message(BATCH_STATUS_MESSAGES)
|
| 276 |
+
await self.facebook.send_message(message=f"... {message} ...")
|
| 277 |
+
except Exception as e:
|
| 278 |
+
logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
|
| 279 |
matches = reranked
|
| 280 |
except Exception as e:
|
| 281 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
|
|
|
| 344 |
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
| 345 |
f"\n\nCâu hỏi của người dùng: {question}\n"
|
| 346 |
)
|
| 347 |
+
await self.facebook.send_message(message=f"... {get_random_message(SUMMARY_STATUS_MESSAGES)} ...")
|
| 348 |
try:
|
| 349 |
answer = await self.channel.llm.generate_text(prompt)
|
| 350 |
if answer and answer.strip():
|
app/reranker.py
CHANGED
|
@@ -5,11 +5,11 @@ from loguru import logger
|
|
| 5 |
import asyncio
|
| 6 |
import hashlib
|
| 7 |
import time
|
| 8 |
-
from .constants import BATCH_STATUS_MESSAGES
|
| 9 |
-
from .utils import get_random_message
|
| 10 |
|
| 11 |
class Reranker:
|
| 12 |
-
def __init__(self
|
| 13 |
settings = get_settings()
|
| 14 |
self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
|
| 15 |
self.model = getattr(settings, 'rerank_model', settings.llm_model)
|
|
@@ -21,14 +21,11 @@ class Reranker:
|
|
| 21 |
# self.client = CohereClient(settings.cohere_api_key, model=self.model)
|
| 22 |
else:
|
| 23 |
raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
|
| 24 |
-
self.facebook_client = facebook_client
|
| 25 |
-
|
| 26 |
# Cải thiện cache với TTL và quản lý memory
|
| 27 |
self._rerank_cache = {}
|
| 28 |
self._cache_ttl = 3600 # 1 giờ
|
| 29 |
self._max_cache_size = 200 # Tăng cache size
|
| 30 |
self._cache_timestamps = {}
|
| 31 |
-
|
| 32 |
# Sử dụng max_docs_to_rerank từ config
|
| 33 |
self.max_docs_to_rerank = settings.max_docs_to_rerank
|
| 34 |
|
|
@@ -244,14 +241,6 @@ class Reranker:
|
|
| 244 |
doc['rerank_score'] = 0
|
| 245 |
scored.append(doc)
|
| 246 |
|
| 247 |
-
# Gửi Facebook message sau khi hoàn thành
|
| 248 |
-
if self.facebook_client:
|
| 249 |
-
try:
|
| 250 |
-
message = get_random_message(BATCH_STATUS_MESSAGES)
|
| 251 |
-
await self.facebook_client.send_message(message=f"... {message} ...")
|
| 252 |
-
except Exception as e:
|
| 253 |
-
logger.error(f"[RERANK][FACEBOOK] Error sending batch message: {e}")
|
| 254 |
-
|
| 255 |
# Sort theo score và trả về top_k
|
| 256 |
scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
|
| 257 |
result = scored[:top_k]
|
|
|
|
| 5 |
import asyncio
|
| 6 |
import hashlib
|
| 7 |
import time
|
| 8 |
+
# from .constants import BATCH_STATUS_MESSAGES
|
| 9 |
+
# from .utils import get_random_message
|
| 10 |
|
| 11 |
class Reranker:
|
| 12 |
+
def __init__(self):
|
| 13 |
settings = get_settings()
|
| 14 |
self.provider = getattr(settings, 'rerank_provider', settings.llm_provider)
|
| 15 |
self.model = getattr(settings, 'rerank_model', settings.llm_model)
|
|
|
|
| 21 |
# self.client = CohereClient(settings.cohere_api_key, model=self.model)
|
| 22 |
else:
|
| 23 |
raise NotImplementedError(f"Rerank provider {self.provider} not supported yet.")
|
|
|
|
|
|
|
| 24 |
# Cải thiện cache với TTL và quản lý memory
|
| 25 |
self._rerank_cache = {}
|
| 26 |
self._cache_ttl = 3600 # 1 giờ
|
| 27 |
self._max_cache_size = 200 # Tăng cache size
|
| 28 |
self._cache_timestamps = {}
|
|
|
|
| 29 |
# Sử dụng max_docs_to_rerank từ config
|
| 30 |
self.max_docs_to_rerank = settings.max_docs_to_rerank
|
| 31 |
|
|
|
|
| 241 |
doc['rerank_score'] = 0
|
| 242 |
scored.append(doc)
|
| 243 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 244 |
# Sort theo score và trả về top_k
|
| 245 |
scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
|
| 246 |
result = scored[:top_k]
|