fix racing issues when sending message
Browse files- app/chat_channel.py +8 -0
- app/message_processor.py +6 -11
app/chat_channel.py
CHANGED
|
@@ -37,6 +37,14 @@ class ChatChannel:
|
|
| 37 |
def invalidate_page_token(self):
|
| 38 |
self.page_token = None
|
| 39 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 40 |
def get_or_create_conversation(self, sender_id: str):
|
| 41 |
if sender_id not in self.conversations:
|
| 42 |
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
|
|
|
| 37 |
def invalidate_page_token(self):
|
| 38 |
self.page_token = None
|
| 39 |
|
| 40 |
+
def get_sheets_client(self):
|
| 41 |
+
settings = get_settings()
|
| 42 |
+
return SheetsClient(
|
| 43 |
+
settings.google_sheets_credentials_file,
|
| 44 |
+
settings.google_sheets_token_file,
|
| 45 |
+
settings.conversation_sheet_id
|
| 46 |
+
)
|
| 47 |
+
|
| 48 |
def get_or_create_conversation(self, sender_id: str):
|
| 49 |
if sender_id not in self.conversations:
|
| 50 |
self.conversations[sender_id] = MessageProcessor(self, sender_id)
|
app/message_processor.py
CHANGED
|
@@ -44,12 +44,11 @@ class MessageProcessor:
|
|
| 44 |
|
| 45 |
# Get conversation history (run in thread pool)
|
| 46 |
loop = asyncio.get_event_loop()
|
| 47 |
-
sheets_client = self.channel.
|
| 48 |
history = []
|
| 49 |
-
|
| 50 |
-
|
| 51 |
-
|
| 52 |
-
)
|
| 53 |
logger.info(f"[DEBUG] history: {history}")#
|
| 54 |
|
| 55 |
log_kwargs = {
|
|
@@ -99,10 +98,7 @@ class MessageProcessor:
|
|
| 99 |
}
|
| 100 |
else:
|
| 101 |
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
|
| 102 |
-
|
| 103 |
-
conv = await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**log_kwargs))
|
| 104 |
-
else:
|
| 105 |
-
conv = log_kwargs.copy()
|
| 106 |
if not conv:
|
| 107 |
logger.error("Không thể tạo conversation mới!")
|
| 108 |
return
|
|
@@ -116,8 +112,7 @@ class MessageProcessor:
|
|
| 116 |
if timestamp not in conv['timestamp']:
|
| 117 |
conv['timestamp'].append(timestamp)
|
| 118 |
logger.info(f"[DEBUG] Message history sau update: {conv}")
|
| 119 |
-
|
| 120 |
-
await loop.run_in_executor(None, lambda: self.channel.sheets.log_conversation(**conv))
|
| 121 |
|
| 122 |
# Get page access token (cache)
|
| 123 |
page_token = self.channel.get_page_token()
|
|
|
|
| 44 |
|
| 45 |
# Get conversation history (run in thread pool)
|
| 46 |
loop = asyncio.get_event_loop()
|
| 47 |
+
sheets_client = self.channel.get_sheets_client()
|
| 48 |
history = []
|
| 49 |
+
history = await loop.run_in_executor(
|
| 50 |
+
None, lambda: sheets_client.get_conversation_history(sender_id, page_id)
|
| 51 |
+
)
|
|
|
|
| 52 |
logger.info(f"[DEBUG] history: {history}")#
|
| 53 |
|
| 54 |
log_kwargs = {
|
|
|
|
| 98 |
}
|
| 99 |
else:
|
| 100 |
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
|
| 101 |
+
conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
|
|
|
|
|
|
|
|
|
|
| 102 |
if not conv:
|
| 103 |
logger.error("Không thể tạo conversation mới!")
|
| 104 |
return
|
|
|
|
| 112 |
if timestamp not in conv['timestamp']:
|
| 113 |
conv['timestamp'].append(timestamp)
|
| 114 |
logger.info(f"[DEBUG] Message history sau update: {conv}")
|
| 115 |
+
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
|
|
|
|
| 116 |
|
| 117 |
# Get page access token (cache)
|
| 118 |
page_token = self.channel.get_page_token()
|