chatbot_server / function /agent /pipeline_agent.py
kltn21110's picture
Upload 239 files
325b400 verified
import os,sys
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../"))
sys.path.insert(0, BASE_DIR)
from function.file import extract_file as extract_file
import function.agent.function_call as fc
import function.function_agent.multi_agent as multi_agent
import function.function_agent.map_toolcall as map_toolcall
import function.gemini_response.response_all as response_all
from models.Database_Entity import StopSignal
from advance_shopping.server_java import server_java
async def check_should_stop(chat_id: str, stop_event: object = None):
# Trường hợp dừng qua RAM (in-memory)
await asyncio.sleep(0.1)
if stop_event and stop_event.is_set():
print("🛑 Dừng qua stop_event.")
return {"status": "cancelled"}
# Trường hợp dừng qua MongoDB
await asyncio.sleep(0.1)
if StopSignal.objects(chat_history=chat_id, is_stopped=True).first():
print("🛑 Dừng vì có StopSignal trong DB.")
return {"status": "cancelled"}
return None # Không bị dừng
async def rollback_cart_if_stopped(stop_event, result_multi_task, chat_id: str, user_id: int, token: str):
if stop_event and stop_event.is_set():
for task_id, task_result in result_multi_task.items():
if task_result.get("is_shopping"):
print(f"INFO: Bắt đầu rollback task mua sắm: {task_id}")
chat_cart = ChatCart()
latest_unconfirmed_cart = ChatCart.objects(chat_history=chat_id).order_by('-created_at').first()
if latest_unconfirmed_cart:
chat_cart = latest_unconfirmed_cart
# Khôi phục trạng thái backup
chat_cart.cart_products = chat_cart.backup_cart_products
chat_cart.status = chat_cart.backup_status
chat_cart.confirmed_order = chat_cart.backup_confirmed_order
chat_cart.save()
cart_id = chat_cart.cart_id
order_id = chat_cart.order_id
if order_id:
try:
server_java.confirm_cancel_AI(int(user_id), token, order_id)
print(f"✅ Đã huỷ đơn hàng: {order_id}")
except Exception as e:
print(f"ERROR khi huỷ đơn hàng {order_id}: {e}")
if cart_id:
try:
server_java.remove_all_cartItem(int(user_id), cart_id, token)
print(f"✅ Đã xoá item trong giỏ hàng: {cart_id}")
except Exception as e:
print(f"ERROR khi xoá cart item {cart_id}: {e}")
# Reset các ID để tránh nhầm lẫn về sau
chat_cart.order_id = None
chat_cart.cart_id = None
chat_cart.save()
else:
print(f"Task {task_id} không phải mua sắm, không cần rollback.")
from typing import Optional
import asyncio
from models.Database_Entity import StopSignal,ChatCart
async def multi_query_user(
user_input: str,
user_id: int,
role,
languages: str,
chat_id,
token,
stop_event: Optional[asyncio.Event] = None
) -> str:
chat_cart = ChatCart()
latest_unconfirmed_cart = ChatCart.objects(chat_history=chat_id,status ="pending").order_by('-created_at').first()
if latest_unconfirmed_cart:
chat_cart = latest_unconfirmed_cart
chat_cart.backup_cart_products = chat_cart.cart_products
chat_cart.backup_status = chat_cart.status
chat_cart.backup_confirmed_order = chat_cart.confirmed_order
chat_cart.save()
result_check = await check_should_stop(chat_id, stop_event)
if result_check:
return result_check
multi_tasks = await fc.process_user_query(user_input, user_id, role, languages, chat_id)
result_check = await check_should_stop(chat_id, stop_event)
if result_check:
return result_check
result_multi_task = await multi_agent.process_toolcalls_with_order(multi_tasks, token, stop_event,chat_id)
chat_cart = ChatCart()
if stop_event and stop_event.is_set():
await rollback_cart_if_stopped(stop_event, result_multi_task, chat_id, user_id, token)
result_check = await check_should_stop(chat_id, stop_event)
if result_check:
await rollback_cart_if_stopped(stop_event, result_multi_task, chat_id, user_id, token)
return result_check
map_tool_result = map_toolcall.map_toolcalls_with_results(multi_tasks, result_multi_task)
result_check = await check_should_stop(chat_id, stop_event)
if result_check:
await rollback_cart_if_stopped(stop_event, result_multi_task, chat_id, user_id, token)
return result_check
result = await response_all.response_all(user_input, map_tool_result)
result_check = await check_should_stop(chat_id, stop_event)
if result_check:
await rollback_cart_if_stopped(stop_event, result_multi_task, chat_id, user_id, token)
return result_check
return result if result is not None else {"status": "empty"}
# import asyncio
# # if __name__ == "__main__":
# # token = "eyJhbGciOiJIUzI1NiJ9.eyJVc2VySWQiOiI0IiwiUm9sZXMiOiJDVVNUT01FUiIsInNlc3Npb25JZCI6ImJkM2M5MmNkLTczMzgtNDlmNS04NzIyLTUwMDU0Zjk5MjRhZSIsInN1YiI6ImxpbmgiLCJpYXQiOjE3NDg3NjcwNDUsImV4cCI6MTc0OTY2NzA0NX0.X4d4GcstDhaVC1nbPWWkN3sUEHNUltDtD5KgP_mzPrM"
# # result = asyncio.run(multi_query_user("Tôi muốn thêm sản phẩm Yogurt Đào size S vào giỏ hàng và tôi quyết định mua sản phẩm này", 4 , "CUSTOMER", "VN","67d009fee0a638763885c195",token=token))
# # print(result)
# if __name__ == "__main__":
# result = asyncio.run(multi_query_user("Tôi muốn thêm trà chanh size L vào giỏ", 4, "CUSTOMER", "ADMIN","67d2fc6607b51a01e3beb501",""))
# print(result)