adjust parallel search and rerank
Browse files- app/facebook.py +36 -16
- app/message_processor.py +14 -14
app/facebook.py
CHANGED
|
@@ -11,7 +11,7 @@ import facebook
|
|
| 11 |
import requests
|
| 12 |
from .config import Settings, get_settings
|
| 13 |
|
| 14 |
-
from .utils import timing_decorator_async, timing_decorator_sync
|
| 15 |
|
| 16 |
class FacebookClient:
|
| 17 |
def __init__(self, app_secret: str, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None):
|
|
@@ -128,12 +128,19 @@ class FacebookClient:
|
|
| 128 |
"access_token": access_token,
|
| 129 |
"message": message
|
| 130 |
}
|
| 131 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 132 |
try:
|
| 133 |
response = requests.post(url, json=payload, timeout=10)
|
| 134 |
response.raise_for_status()
|
|
|
|
| 135 |
return response.json()
|
| 136 |
except requests.RequestException as e:
|
|
|
|
| 137 |
return {"error": str(e)}
|
| 138 |
|
| 139 |
def _send_message_sync(self, page_access_token: str, recipient_id: str, message: str) -> dict:
|
|
@@ -199,30 +206,43 @@ class FacebookClient:
|
|
| 199 |
async def send_message(self, page_access_token: Optional[str] = None, recipient_id: Optional[str] = None, message: str = "") -> dict:
|
| 200 |
page_access_token = page_access_token or self.page_token
|
| 201 |
recipient_id = recipient_id or self.sender_id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 202 |
if not page_access_token or not recipient_id:
|
|
|
|
| 203 |
raise ValueError("FacebookClient: page_access_token and recipient_id must not be None when sending a message.")
|
| 204 |
|
|
|
|
|
|
|
| 205 |
# Format message
|
| 206 |
-
response_to_send = self.format_message(message.replace('**', '*'))
|
| 207 |
|
| 208 |
# Chia nhỏ nếu quá dài
|
| 209 |
messages = self.split_message(response_to_send)
|
| 210 |
results = []
|
| 211 |
|
| 212 |
-
for
|
| 213 |
-
if len(
|
| 214 |
-
|
| 215 |
|
| 216 |
-
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
|
| 220 |
-
|
| 221 |
-
|
| 222 |
-
|
| 223 |
-
|
| 224 |
-
|
| 225 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 226 |
|
| 227 |
return results[0] if results else {}
|
| 228 |
|
|
|
|
| 11 |
import requests
|
| 12 |
from .config import Settings, get_settings
|
| 13 |
|
| 14 |
+
from .utils import timing_decorator_async, timing_decorator_sync, _safe_truncate
|
| 15 |
|
| 16 |
class FacebookClient:
|
| 17 |
def __init__(self, app_secret: str, page_id: Optional[str] = None, page_token: Optional[str] = None, sender_id: Optional[str] = None):
|
|
|
|
| 128 |
"access_token": access_token,
|
| 129 |
"message": message
|
| 130 |
}
|
| 131 |
+
|
| 132 |
+
log_payload = payload.copy()
|
| 133 |
+
log_payload['message'] = _safe_truncate(log_payload['message'])
|
| 134 |
+
log_payload['access_token'] = f"{log_payload.get('access_token', '')[:5]}..." if log_payload.get('access_token') else "None"
|
| 135 |
+
|
| 136 |
+
logger.info(f"[FACEBOOK_FORWARDER] Forwarding message to {url} for recipient {recipient_id}. Payload (truncated): {log_payload}")
|
| 137 |
try:
|
| 138 |
response = requests.post(url, json=payload, timeout=10)
|
| 139 |
response.raise_for_status()
|
| 140 |
+
logger.info(f"[FACEBOOK_FORWARDER] Forwarder API returned status {response.status_code}.")
|
| 141 |
return response.json()
|
| 142 |
except requests.RequestException as e:
|
| 143 |
+
logger.error(f"[FACEBOOK_FORWARDER] Error calling forwarder API: {e}")
|
| 144 |
return {"error": str(e)}
|
| 145 |
|
| 146 |
def _send_message_sync(self, page_access_token: str, recipient_id: str, message: str) -> dict:
|
|
|
|
| 206 |
async def send_message(self, page_access_token: Optional[str] = None, recipient_id: Optional[str] = None, message: str = "") -> dict:
|
| 207 |
page_access_token = page_access_token or self.page_token
|
| 208 |
recipient_id = recipient_id or self.sender_id
|
| 209 |
+
|
| 210 |
+
if not message or not str(message).strip():
|
| 211 |
+
logger.warning(f"[FACEBOOK_SEND] Attempted to send an empty or whitespace-only message to recipient {recipient_id}. Aborting.")
|
| 212 |
+
return {}
|
| 213 |
+
|
| 214 |
if not page_access_token or not recipient_id:
|
| 215 |
+
logger.error(f"[FACEBOOK_SEND] Missing page_access_token or recipient_id. Cannot send message.")
|
| 216 |
raise ValueError("FacebookClient: page_access_token and recipient_id must not be None when sending a message.")
|
| 217 |
|
| 218 |
+
logger.info(f"[FACEBOOK_SEND] Preparing to send message to recipient {recipient_id}. Full message (truncated): '{_safe_truncate(str(message))}'")
|
| 219 |
+
|
| 220 |
# Format message
|
| 221 |
+
response_to_send = self.format_message(str(message).replace('**', '*'))
|
| 222 |
|
| 223 |
# Chia nhỏ nếu quá dài
|
| 224 |
messages = self.split_message(response_to_send)
|
| 225 |
results = []
|
| 226 |
|
| 227 |
+
for i, msg_part in enumerate(messages, 1):
|
| 228 |
+
if len(msg_part) > 2000:
|
| 229 |
+
msg_part = msg_part[:2000] # fallback cắt cứng
|
| 230 |
|
| 231 |
+
logger.info(f"[FACEBOOK_SEND] Sending part {i}/{len(messages)} to recipient {recipient_id}.")
|
| 232 |
+
try:
|
| 233 |
+
# Wrap sync HTTP call in thread executor để giữ async
|
| 234 |
+
loop = asyncio.get_event_loop()
|
| 235 |
+
result = await loop.run_in_executor(
|
| 236 |
+
None,
|
| 237 |
+
self.send_message_forwarder,
|
| 238 |
+
page_access_token,
|
| 239 |
+
recipient_id,
|
| 240 |
+
msg_part
|
| 241 |
+
)
|
| 242 |
+
results.append(result)
|
| 243 |
+
except Exception as e:
|
| 244 |
+
logger.error(f"[FACEBOOK_SEND] Failed to send part {i}/{len(messages)} to {recipient_id}. Error: {e}")
|
| 245 |
+
results.append({"error": str(e), "part": i})
|
| 246 |
|
| 247 |
return results[0] if results else {}
|
| 248 |
|
app/message_processor.py
CHANGED
|
@@ -250,11 +250,11 @@ class MessageProcessor:
|
|
| 250 |
asyncio.create_task(self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)))
|
| 251 |
|
| 252 |
#TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
|
| 253 |
-
|
| 254 |
-
|
| 255 |
-
|
| 256 |
-
|
| 257 |
-
|
| 258 |
|
| 259 |
# --- START: Logical Retry Loop for MAX_TOKENS/SAFETY ---
|
| 260 |
max_logical_retries = 3
|
|
@@ -351,7 +351,7 @@ class MessageProcessor:
|
|
| 351 |
lambda: self.channel.supabase.match_documents(
|
| 352 |
embedding=embedding,
|
| 353 |
match_count=match_count,
|
| 354 |
-
user_question=
|
| 355 |
vehicle_keywords=vehicle_keywords
|
| 356 |
)
|
| 357 |
)
|
|
@@ -366,14 +366,14 @@ class MessageProcessor:
|
|
| 366 |
# CẢNH BÁO: Bước này rất tốn kém và làm chậm hệ thống nếu chạy cho mỗi từ khóa.
|
| 367 |
# Việc rerank nhiều lần sẽ làm tăng chi phí và có thể chạm giới hạn API.
|
| 368 |
reranked_matches = matches # Mặc định trả về kết quả gốc nếu rerank bị lỗi hoặc tắt
|
| 369 |
-
|
| 370 |
-
|
| 371 |
-
|
| 372 |
-
|
| 373 |
-
|
| 374 |
-
|
| 375 |
-
|
| 376 |
-
|
| 377 |
|
| 378 |
return reranked_matches
|
| 379 |
except Exception as e:
|
|
|
|
| 250 |
asyncio.create_task(self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)))
|
| 251 |
|
| 252 |
#TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
|
| 253 |
+
try:
|
| 254 |
+
reranked = await self.channel.reranker.rerank(question, matches, top_k=10)
|
| 255 |
+
if reranked: matches = reranked
|
| 256 |
+
except Exception as e:
|
| 257 |
+
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
| 258 |
|
| 259 |
# --- START: Logical Retry Loop for MAX_TOKENS/SAFETY ---
|
| 260 |
max_logical_retries = 3
|
|
|
|
| 351 |
lambda: self.channel.supabase.match_documents(
|
| 352 |
embedding=embedding,
|
| 353 |
match_count=match_count,
|
| 354 |
+
user_question=keyword,
|
| 355 |
vehicle_keywords=vehicle_keywords
|
| 356 |
)
|
| 357 |
)
|
|
|
|
| 366 |
# CẢNH BÁO: Bước này rất tốn kém và làm chậm hệ thống nếu chạy cho mỗi từ khóa.
|
| 367 |
# Việc rerank nhiều lần sẽ làm tăng chi phí và có thể chạm giới hạn API.
|
| 368 |
reranked_matches = matches # Mặc định trả về kết quả gốc nếu rerank bị lỗi hoặc tắt
|
| 369 |
+
try:
|
| 370 |
+
# Sử dụng full_query_context để rerank sẽ cho kết quả tốt hơn là chỉ dùng keyword
|
| 371 |
+
reranked = await self.channel.reranker.rerank(keyword, matches, top_k=10)
|
| 372 |
+
if reranked:
|
| 373 |
+
reranked_matches = reranked
|
| 374 |
+
logger.info(f"[SEARCH_RERANK_TASK] Rerank thành công cho từ khóa '{keyword}', còn lại {len(reranked_matches)} kết quả.")
|
| 375 |
+
except Exception as e:
|
| 376 |
+
logger.error(f"[SEARCH_RERANK_TASK] Lỗi khi rerank cho từ khóa '{keyword}': {e}. Sử dụng kết quả gốc.")
|
| 377 |
|
| 378 |
return reranked_matches
|
| 379 |
except Exception as e:
|